]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
move encode/upload/download/checker.py into a new immutable/ directory. No behavior...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1
2 import os
3 from twisted.trial import unittest
4 from twisted.python.failure import Failure
5 from twisted.python import log
6 from twisted.internet import defer
7 from cStringIO import StringIO
8
9 from allmydata import uri
10 from allmydata.immutable import upload, encode
11 from allmydata.interfaces import IFileURI, FileTooLargeError
12 from allmydata.util.assertutil import precondition
13 from allmydata.util.deferredutil import DeferredListShouldSucceed
14 from allmydata.util.testutil import ShouldFailMixin
15 from foolscap import eventual
16
17 MiB = 1024*1024
18
19 def extract_uri(results):
20     return results.uri
21
22 class Uploadable(unittest.TestCase):
23     def shouldEqual(self, data, expected):
24         self.failUnless(isinstance(data, list))
25         for e in data:
26             self.failUnless(isinstance(e, str))
27         s = "".join(data)
28         self.failUnlessEqual(s, expected)
29
30     def test_filehandle_random_key(self):
31         return self._test_filehandle(convergence=None)
32
33     def test_filehandle_convergent_encryption(self):
34         return self._test_filehandle(convergence="some convergence string")
35
36     def _test_filehandle(self, convergence):
37         s = StringIO("a"*41)
38         u = upload.FileHandle(s, convergence=convergence)
39         d = u.get_size()
40         d.addCallback(self.failUnlessEqual, 41)
41         d.addCallback(lambda res: u.read(1))
42         d.addCallback(self.shouldEqual, "a")
43         d.addCallback(lambda res: u.read(80))
44         d.addCallback(self.shouldEqual, "a"*40)
45         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
46         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
47         return d
48
49     def test_filename(self):
50         basedir = "upload/Uploadable/test_filename"
51         os.makedirs(basedir)
52         fn = os.path.join(basedir, "file")
53         f = open(fn, "w")
54         f.write("a"*41)
55         f.close()
56         u = upload.FileName(fn, convergence=None)
57         d = u.get_size()
58         d.addCallback(self.failUnlessEqual, 41)
59         d.addCallback(lambda res: u.read(1))
60         d.addCallback(self.shouldEqual, "a")
61         d.addCallback(lambda res: u.read(80))
62         d.addCallback(self.shouldEqual, "a"*40)
63         d.addCallback(lambda res: u.close())
64         return d
65
66     def test_data(self):
67         s = "a"*41
68         u = upload.Data(s, convergence=None)
69         d = u.get_size()
70         d.addCallback(self.failUnlessEqual, 41)
71         d.addCallback(lambda res: u.read(1))
72         d.addCallback(self.shouldEqual, "a")
73         d.addCallback(lambda res: u.read(80))
74         d.addCallback(self.shouldEqual, "a"*40)
75         d.addCallback(lambda res: u.close())
76         return d
77
78 class FakeStorageServer:
79     def __init__(self, mode):
80         self.mode = mode
81         self.allocated = []
82         self.queries = 0
83     def callRemote(self, methname, *args, **kwargs):
84         def _call():
85             meth = getattr(self, methname)
86             return meth(*args, **kwargs)
87         d = eventual.fireEventually()
88         d.addCallback(lambda res: _call())
89         return d
90
91     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
92                          sharenums, share_size, canary):
93         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
94         self.queries += 1
95         if self.mode == "full":
96             return (set(), {},)
97         elif self.mode == "already got them":
98             return (set(sharenums), {},)
99         else:
100             for shnum in sharenums:
101                 self.allocated.append( (storage_index, shnum) )
102             return (set(),
103                     dict([( shnum, FakeBucketWriter(share_size) )
104                           for shnum in sharenums]),
105                     )
106
107 class FakeBucketWriter:
108     # a diagnostic version of storageserver.BucketWriter
109     def __init__(self, size):
110         self.data = StringIO()
111         self.closed = False
112         self._size = size
113
114     def callRemote(self, methname, *args, **kwargs):
115         def _call():
116             meth = getattr(self, "remote_" + methname)
117             return meth(*args, **kwargs)
118         d = eventual.fireEventually()
119         d.addCallback(lambda res: _call())
120         return d
121
122     def remote_write(self, offset, data):
123         precondition(not self.closed)
124         precondition(offset >= 0)
125         precondition(offset+len(data) <= self._size,
126                      "offset=%d + data=%d > size=%d" %
127                      (offset, len(data), self._size))
128         self.data.seek(offset)
129         self.data.write(data)
130
131     def remote_close(self):
132         precondition(not self.closed)
133         self.closed = True
134
135     def remote_abort(self):
136         log.err("uh oh, I was asked to abort")
137
138 class FakeClient:
139     DEFAULT_ENCODING_PARAMETERS = {"k":25,
140                                    "happy": 75,
141                                    "n": 100,
142                                    "max_segment_size": 1*MiB,
143                                    }
144     def __init__(self, mode="good", num_servers=50):
145         self.mode = mode
146         self.num_servers = num_servers
147     def log(self, *args, **kwargs):
148         pass
149     def get_permuted_peers(self, storage_index, include_myself):
150         peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
151                   for fakeid in range(self.num_servers) ]
152         self.last_peers = [p[1] for p in peers]
153         return peers
154     def get_encoding_parameters(self):
155         return self.DEFAULT_ENCODING_PARAMETERS
156
157     def get_renewal_secret(self):
158         return ""
159     def get_cancel_secret(self):
160         return ""
161
162 class GiganticUploadable(upload.FileHandle):
163     def __init__(self, size):
164         self._size = size
165         self._fp = 0
166
167     def get_encryption_key(self):
168         return defer.succeed("\x00" * 16)
169     def get_size(self):
170         return defer.succeed(self._size)
171     def read(self, length):
172         left = self._size - self._fp
173         length = min(left, length)
174         self._fp += length
175         if self._fp > 1000000:
176             # terminate the test early.
177             raise RuntimeError("we shouldn't be allowed to get this far")
178         return defer.succeed(["\x00" * length])
179     def close(self):
180         pass
181
182 DATA = """
183 Once upon a time, there was a beautiful princess named Buttercup. She lived
184 in a magical land where every file was stored securely among millions of
185 machines, and nobody ever worried about their data being lost ever again.
186 The End.
187 """
188 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
189
190 SIZE_ZERO = 0
191 SIZE_SMALL = 16
192 SIZE_LARGE = len(DATA)
193
194 def upload_data(uploader, data):
195     u = upload.Data(data, convergence=None)
196     return uploader.upload(u)
197 def upload_filename(uploader, filename):
198     u = upload.FileName(filename, convergence=None)
199     return uploader.upload(u)
200 def upload_filehandle(uploader, fh):
201     u = upload.FileHandle(fh, convergence=None)
202     return uploader.upload(u)
203
204 class GoodServer(unittest.TestCase, ShouldFailMixin):
205     def setUp(self):
206         self.node = FakeClient(mode="good")
207         self.u = upload.Uploader()
208         self.u.running = True
209         self.u.parent = self.node
210
211     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
212         p = {"k": k,
213              "happy": happy,
214              "n": n,
215              "max_segment_size": max_segsize,
216              }
217         self.node.DEFAULT_ENCODING_PARAMETERS = p
218
219     def _check_small(self, newuri, size):
220         u = IFileURI(newuri)
221         self.failUnless(isinstance(u, uri.LiteralFileURI))
222         self.failUnlessEqual(len(u.data), size)
223
224     def _check_large(self, newuri, size):
225         u = IFileURI(newuri)
226         self.failUnless(isinstance(u, uri.CHKFileURI))
227         self.failUnless(isinstance(u.storage_index, str))
228         self.failUnlessEqual(len(u.storage_index), 16)
229         self.failUnless(isinstance(u.key, str))
230         self.failUnlessEqual(len(u.key), 16)
231         self.failUnlessEqual(u.size, size)
232
233     def get_data(self, size):
234         return DATA[:size]
235
236     def test_too_large(self):
237         # we currently impose a sizelimit on uploaded files, because of
238         # limitations in the share format (see ticket #346 for details). The
239         # limit is set to ensure that no share is larger than 4GiB. Make sure
240         # that we reject files larger than that.
241         k = 3; happy = 7; n = 10
242         self.set_encoding_parameters(k, happy, n)
243         data1 = GiganticUploadable(k*4*1024*1024*1024)
244         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
245                             "This file is too large to be uploaded (data_size)",
246                             self.u.upload, data1)
247         data2 = GiganticUploadable(k*4*1024*1024*1024-3)
248         d.addCallback(lambda res:
249                       self.shouldFail(FileTooLargeError,
250                                       "test_too_large-data2",
251                                       "This file is too large to be uploaded (offsets)",
252                                       self.u.upload, data2))
253         # I don't know where the actual limit is.. it depends upon how large
254         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
255         return d
256
257     def test_data_zero(self):
258         data = self.get_data(SIZE_ZERO)
259         d = upload_data(self.u, data)
260         d.addCallback(extract_uri)
261         d.addCallback(self._check_small, SIZE_ZERO)
262         return d
263
264     def test_data_small(self):
265         data = self.get_data(SIZE_SMALL)
266         d = upload_data(self.u, data)
267         d.addCallback(extract_uri)
268         d.addCallback(self._check_small, SIZE_SMALL)
269         return d
270
271     def test_data_large(self):
272         data = self.get_data(SIZE_LARGE)
273         d = upload_data(self.u, data)
274         d.addCallback(extract_uri)
275         d.addCallback(self._check_large, SIZE_LARGE)
276         return d
277
278     def test_data_large_odd_segments(self):
279         data = self.get_data(SIZE_LARGE)
280         segsize = int(SIZE_LARGE / 2.5)
281         # we want 3 segments, since that's not a power of two
282         self.set_encoding_parameters(25, 75, 100, segsize)
283         d = upload_data(self.u, data)
284         d.addCallback(extract_uri)
285         d.addCallback(self._check_large, SIZE_LARGE)
286         return d
287
288     def test_filehandle_zero(self):
289         data = self.get_data(SIZE_ZERO)
290         d = upload_filehandle(self.u, StringIO(data))
291         d.addCallback(extract_uri)
292         d.addCallback(self._check_small, SIZE_ZERO)
293         return d
294
295     def test_filehandle_small(self):
296         data = self.get_data(SIZE_SMALL)
297         d = upload_filehandle(self.u, StringIO(data))
298         d.addCallback(extract_uri)
299         d.addCallback(self._check_small, SIZE_SMALL)
300         return d
301
302     def test_filehandle_large(self):
303         data = self.get_data(SIZE_LARGE)
304         d = upload_filehandle(self.u, StringIO(data))
305         d.addCallback(extract_uri)
306         d.addCallback(self._check_large, SIZE_LARGE)
307         return d
308
309     def test_filename_zero(self):
310         fn = "Uploader-test_filename_zero.data"
311         f = open(fn, "wb")
312         data = self.get_data(SIZE_ZERO)
313         f.write(data)
314         f.close()
315         d = upload_filename(self.u, fn)
316         d.addCallback(extract_uri)
317         d.addCallback(self._check_small, SIZE_ZERO)
318         return d
319
320     def test_filename_small(self):
321         fn = "Uploader-test_filename_small.data"
322         f = open(fn, "wb")
323         data = self.get_data(SIZE_SMALL)
324         f.write(data)
325         f.close()
326         d = upload_filename(self.u, fn)
327         d.addCallback(extract_uri)
328         d.addCallback(self._check_small, SIZE_SMALL)
329         return d
330
331     def test_filename_large(self):
332         fn = "Uploader-test_filename_large.data"
333         f = open(fn, "wb")
334         data = self.get_data(SIZE_LARGE)
335         f.write(data)
336         f.close()
337         d = upload_filename(self.u, fn)
338         d.addCallback(extract_uri)
339         d.addCallback(self._check_large, SIZE_LARGE)
340         return d
341
342 class FullServer(unittest.TestCase):
343     def setUp(self):
344         self.node = FakeClient(mode="full")
345         self.u = upload.Uploader()
346         self.u.running = True
347         self.u.parent = self.node
348
349     def _should_fail(self, f):
350         self.failUnless(isinstance(f, Failure) and f.check(encode.NotEnoughSharesError), f)
351
352     def test_data_large(self):
353         data = DATA
354         d = upload_data(self.u, data)
355         d.addBoth(self._should_fail)
356         return d
357
358 class PeerSelection(unittest.TestCase):
359
360     def make_client(self, num_servers=50):
361         self.node = FakeClient(mode="good", num_servers=num_servers)
362         self.u = upload.Uploader()
363         self.u.running = True
364         self.u.parent = self.node
365
366     def get_data(self, size):
367         return DATA[:size]
368
369     def _check_large(self, newuri, size):
370         u = IFileURI(newuri)
371         self.failUnless(isinstance(u, uri.CHKFileURI))
372         self.failUnless(isinstance(u.storage_index, str))
373         self.failUnlessEqual(len(u.storage_index), 16)
374         self.failUnless(isinstance(u.key, str))
375         self.failUnlessEqual(len(u.key), 16)
376         self.failUnlessEqual(u.size, size)
377
378     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
379         p = {"k": k,
380              "happy": happy,
381              "n": n,
382              "max_segment_size": max_segsize,
383              }
384         self.node.DEFAULT_ENCODING_PARAMETERS = p
385
386     def test_one_each(self):
387         # if we have 50 shares, and there are 50 peers, and they all accept a
388         # share, we should get exactly one share per peer
389
390         self.make_client()
391         data = self.get_data(SIZE_LARGE)
392         self.set_encoding_parameters(25, 30, 50)
393         d = upload_data(self.u, data)
394         d.addCallback(extract_uri)
395         d.addCallback(self._check_large, SIZE_LARGE)
396         def _check(res):
397             for p in self.node.last_peers:
398                 allocated = p.allocated
399                 self.failUnlessEqual(len(allocated), 1)
400                 self.failUnlessEqual(p.queries, 1)
401         d.addCallback(_check)
402         return d
403
404     def test_two_each(self):
405         # if we have 100 shares, and there are 50 peers, and they all accept
406         # all shares, we should get exactly two shares per peer
407
408         self.make_client()
409         data = self.get_data(SIZE_LARGE)
410         self.set_encoding_parameters(50, 75, 100)
411         d = upload_data(self.u, data)
412         d.addCallback(extract_uri)
413         d.addCallback(self._check_large, SIZE_LARGE)
414         def _check(res):
415             for p in self.node.last_peers:
416                 allocated = p.allocated
417                 self.failUnlessEqual(len(allocated), 2)
418                 self.failUnlessEqual(p.queries, 2)
419         d.addCallback(_check)
420         return d
421
422     def test_one_each_plus_one_extra(self):
423         # if we have 51 shares, and there are 50 peers, then one peer gets
424         # two shares and the rest get just one
425
426         self.make_client()
427         data = self.get_data(SIZE_LARGE)
428         self.set_encoding_parameters(24, 41, 51)
429         d = upload_data(self.u, data)
430         d.addCallback(extract_uri)
431         d.addCallback(self._check_large, SIZE_LARGE)
432         def _check(res):
433             got_one = []
434             got_two = []
435             for p in self.node.last_peers:
436                 allocated = p.allocated
437                 self.failUnless(len(allocated) in (1,2), len(allocated))
438                 if len(allocated) == 1:
439                     self.failUnlessEqual(p.queries, 1)
440                     got_one.append(p)
441                 else:
442                     self.failUnlessEqual(p.queries, 2)
443                     got_two.append(p)
444             self.failUnlessEqual(len(got_one), 49)
445             self.failUnlessEqual(len(got_two), 1)
446         d.addCallback(_check)
447         return d
448
449     def test_four_each(self):
450         # if we have 200 shares, and there are 50 peers, then each peer gets
451         # 4 shares. The design goal is to accomplish this with only two
452         # queries per peer.
453
454         self.make_client()
455         data = self.get_data(SIZE_LARGE)
456         self.set_encoding_parameters(100, 150, 200)
457         d = upload_data(self.u, data)
458         d.addCallback(extract_uri)
459         d.addCallback(self._check_large, SIZE_LARGE)
460         def _check(res):
461             for p in self.node.last_peers:
462                 allocated = p.allocated
463                 self.failUnlessEqual(len(allocated), 4)
464                 self.failUnlessEqual(p.queries, 2)
465         d.addCallback(_check)
466         return d
467
468     def test_three_of_ten(self):
469         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
470         # 4+4+2
471
472         self.make_client(3)
473         data = self.get_data(SIZE_LARGE)
474         self.set_encoding_parameters(3, 5, 10)
475         d = upload_data(self.u, data)
476         d.addCallback(extract_uri)
477         d.addCallback(self._check_large, SIZE_LARGE)
478         def _check(res):
479             counts = {}
480             for p in self.node.last_peers:
481                 allocated = p.allocated
482                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
483             histogram = [counts.get(i, 0) for i in range(5)]
484             self.failUnlessEqual(histogram, [0,0,0,2,1])
485         d.addCallback(_check)
486         return d
487
488 class StorageIndex(unittest.TestCase):
489     def test_params_must_matter(self):
490         DATA = "I am some data"
491         u = upload.Data(DATA, convergence="")
492         eu = upload.EncryptAnUploadable(u)
493         d1 = eu.get_storage_index()
494
495         # CHK means the same data should encrypt the same way
496         u = upload.Data(DATA, convergence="")
497         eu = upload.EncryptAnUploadable(u)
498         d1a = eu.get_storage_index()
499
500         # but if we use a different convergence string it should be different
501         u = upload.Data(DATA, convergence="wheee!")
502         eu = upload.EncryptAnUploadable(u)
503         d1salt1 = eu.get_storage_index()
504
505         # and if we add yet a different convergence it should be different again
506         u = upload.Data(DATA, convergence="NOT wheee!")
507         eu = upload.EncryptAnUploadable(u)
508         d1salt2 = eu.get_storage_index()
509
510         # and if we use the first string again it should be the same as last time
511         u = upload.Data(DATA, convergence="wheee!")
512         eu = upload.EncryptAnUploadable(u)
513         d1salt1a = eu.get_storage_index()
514
515         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
516         u = upload.Data(DATA, convergence="")
517         u.encoding_param_k = u.default_encoding_param_k + 1
518         eu = upload.EncryptAnUploadable(u)
519         d2 = eu.get_storage_index()
520
521         # and if we use a random key, it should be different than the CHK
522         u = upload.Data(DATA, convergence=None)
523         eu = upload.EncryptAnUploadable(u)
524         d3 = eu.get_storage_index()
525         # and different from another instance
526         u = upload.Data(DATA, convergence=None)
527         eu = upload.EncryptAnUploadable(u)
528         d4 = eu.get_storage_index()
529
530         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
531         def _done(res):
532             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
533             self.failUnlessEqual(si1, si1a)
534             self.failIfEqual(si1, si2)
535             self.failIfEqual(si1, si3)
536             self.failIfEqual(si1, si4)
537             self.failIfEqual(si3, si4)
538             self.failIfEqual(si1salt1, si1)
539             self.failIfEqual(si1salt1, si1salt2)
540             self.failIfEqual(si1salt2, si1)
541             self.failUnlessEqual(si1salt1, si1salt1a)
542         d.addCallback(_done)
543         return d
544
545
546 # TODO:
547 #  upload with exactly 75 peers (shares_of_happiness)
548 #  have a download fail
549 #  cancel a download (need to implement more cancel stuff)