]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
versioning: include an "appname" in the application version string in the versioning...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1
2 import os
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.python.failure import Failure
6 from twisted.python import log
7 from twisted.internet import defer
8 from foolscap import eventual
9
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor
12 from allmydata.immutable import upload
13 from allmydata.interfaces import IFileURI, FileTooLargeError, NotEnoughSharesError
14 from allmydata.util.assertutil import precondition
15 from allmydata.util.deferredutil import DeferredListShouldSucceed
16 from common import SystemTestMixin
17 from common_util import ShouldFailMixin
18
19 MiB = 1024*1024
20
21 def extract_uri(results):
22     return results.uri
23
24 class Uploadable(unittest.TestCase):
25     def shouldEqual(self, data, expected):
26         self.failUnless(isinstance(data, list))
27         for e in data:
28             self.failUnless(isinstance(e, str))
29         s = "".join(data)
30         self.failUnlessEqual(s, expected)
31
32     def test_filehandle_random_key(self):
33         return self._test_filehandle(convergence=None)
34
35     def test_filehandle_convergent_encryption(self):
36         return self._test_filehandle(convergence="some convergence string")
37
38     def _test_filehandle(self, convergence):
39         s = StringIO("a"*41)
40         u = upload.FileHandle(s, convergence=convergence)
41         d = u.get_size()
42         d.addCallback(self.failUnlessEqual, 41)
43         d.addCallback(lambda res: u.read(1))
44         d.addCallback(self.shouldEqual, "a")
45         d.addCallback(lambda res: u.read(80))
46         d.addCallback(self.shouldEqual, "a"*40)
47         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
48         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
49         return d
50
51     def test_filename(self):
52         basedir = "upload/Uploadable/test_filename"
53         os.makedirs(basedir)
54         fn = os.path.join(basedir, "file")
55         f = open(fn, "w")
56         f.write("a"*41)
57         f.close()
58         u = upload.FileName(fn, convergence=None)
59         d = u.get_size()
60         d.addCallback(self.failUnlessEqual, 41)
61         d.addCallback(lambda res: u.read(1))
62         d.addCallback(self.shouldEqual, "a")
63         d.addCallback(lambda res: u.read(80))
64         d.addCallback(self.shouldEqual, "a"*40)
65         d.addCallback(lambda res: u.close())
66         return d
67
68     def test_data(self):
69         s = "a"*41
70         u = upload.Data(s, convergence=None)
71         d = u.get_size()
72         d.addCallback(self.failUnlessEqual, 41)
73         d.addCallback(lambda res: u.read(1))
74         d.addCallback(self.shouldEqual, "a")
75         d.addCallback(lambda res: u.read(80))
76         d.addCallback(self.shouldEqual, "a"*40)
77         d.addCallback(lambda res: u.close())
78         return d
79
80 class FakeStorageServer:
81     def __init__(self, mode):
82         self.mode = mode
83         self.allocated = []
84         self.queries = 0
85         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
86                          { "maximum-immutable-share-size": 2**32 },
87                          "application-version": str(allmydata.__full_version__),
88                          }
89         if mode == "small":
90             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
91                              { "maximum-immutable-share-size": 10 },
92                              "application-version": str(allmydata.__full_version__),
93                              }
94
95
96     def callRemote(self, methname, *args, **kwargs):
97         def _call():
98             meth = getattr(self, methname)
99             return meth(*args, **kwargs)
100         d = eventual.fireEventually()
101         d.addCallback(lambda res: _call())
102         return d
103
104     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
105                          sharenums, share_size, canary):
106         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
107         self.queries += 1
108         if self.mode == "full":
109             return (set(), {},)
110         elif self.mode == "already got them":
111             return (set(sharenums), {},)
112         else:
113             for shnum in sharenums:
114                 self.allocated.append( (storage_index, shnum) )
115             return (set(),
116                     dict([( shnum, FakeBucketWriter(share_size) )
117                           for shnum in sharenums]),
118                     )
119
120 class FakeBucketWriter:
121     # a diagnostic version of storageserver.BucketWriter
122     def __init__(self, size):
123         self.data = StringIO()
124         self.closed = False
125         self._size = size
126
127     def callRemote(self, methname, *args, **kwargs):
128         def _call():
129             meth = getattr(self, "remote_" + methname)
130             return meth(*args, **kwargs)
131         d = eventual.fireEventually()
132         d.addCallback(lambda res: _call())
133         return d
134
135     def remote_write(self, offset, data):
136         precondition(not self.closed)
137         precondition(offset >= 0)
138         precondition(offset+len(data) <= self._size,
139                      "offset=%d + data=%d > size=%d" %
140                      (offset, len(data), self._size))
141         self.data.seek(offset)
142         self.data.write(data)
143
144     def remote_close(self):
145         precondition(not self.closed)
146         self.closed = True
147
148     def remote_abort(self):
149         log.err("uh oh, I was asked to abort")
150
151 class FakeClient:
152     DEFAULT_ENCODING_PARAMETERS = {"k":25,
153                                    "happy": 75,
154                                    "n": 100,
155                                    "max_segment_size": 1*MiB,
156                                    }
157     def __init__(self, mode="good", num_servers=50):
158         self.mode = mode
159         self.num_servers = num_servers
160         if mode == "some_big_some_small":
161             self.peers = []
162             for fakeid in range(num_servers):
163                 if fakeid % 2:
164                     self.peers.append( ("%20d" % fakeid,
165                                         FakeStorageServer("good")) )
166                 else:
167                     self.peers.append( ("%20d" % fakeid,
168                                         FakeStorageServer("small")) )
169         else:
170             self.peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
171                            for fakeid in range(self.num_servers) ]
172     def log(self, *args, **kwargs):
173         pass
174     def get_permuted_peers(self, storage_index, include_myself):
175         self.last_peers = [p[1] for p in self.peers]
176         return self.peers
177     def get_encoding_parameters(self):
178         return self.DEFAULT_ENCODING_PARAMETERS
179
180     def get_renewal_secret(self):
181         return ""
182     def get_cancel_secret(self):
183         return ""
184
185 class GiganticUploadable(upload.FileHandle):
186     def __init__(self, size):
187         self._size = size
188         self._fp = 0
189
190     def get_encryption_key(self):
191         return defer.succeed("\x00" * 16)
192     def get_size(self):
193         return defer.succeed(self._size)
194     def read(self, length):
195         left = self._size - self._fp
196         length = min(left, length)
197         self._fp += length
198         if self._fp > 1000000:
199             # terminate the test early.
200             raise RuntimeError("we shouldn't be allowed to get this far")
201         return defer.succeed(["\x00" * length])
202     def close(self):
203         pass
204
205 DATA = """
206 Once upon a time, there was a beautiful princess named Buttercup. She lived
207 in a magical land where every file was stored securely among millions of
208 machines, and nobody ever worried about their data being lost ever again.
209 The End.
210 """
211 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
212
213 SIZE_ZERO = 0
214 SIZE_SMALL = 16
215 SIZE_LARGE = len(DATA)
216
217 def upload_data(uploader, data):
218     u = upload.Data(data, convergence=None)
219     return uploader.upload(u)
220 def upload_filename(uploader, filename):
221     u = upload.FileName(filename, convergence=None)
222     return uploader.upload(u)
223 def upload_filehandle(uploader, fh):
224     u = upload.FileHandle(fh, convergence=None)
225     return uploader.upload(u)
226
227 class GoodServer(unittest.TestCase, ShouldFailMixin):
228     def setUp(self):
229         self.node = FakeClient(mode="good")
230         self.u = upload.Uploader()
231         self.u.running = True
232         self.u.parent = self.node
233
234     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
235         p = {"k": k,
236              "happy": happy,
237              "n": n,
238              "max_segment_size": max_segsize,
239              }
240         self.node.DEFAULT_ENCODING_PARAMETERS = p
241
242     def _check_small(self, newuri, size):
243         u = IFileURI(newuri)
244         self.failUnless(isinstance(u, uri.LiteralFileURI))
245         self.failUnlessEqual(len(u.data), size)
246
247     def _check_large(self, newuri, size):
248         u = IFileURI(newuri)
249         self.failUnless(isinstance(u, uri.CHKFileURI))
250         self.failUnless(isinstance(u.storage_index, str))
251         self.failUnlessEqual(len(u.storage_index), 16)
252         self.failUnless(isinstance(u.key, str))
253         self.failUnlessEqual(len(u.key), 16)
254         self.failUnlessEqual(u.size, size)
255
256     def get_data(self, size):
257         return DATA[:size]
258
259     def test_too_large(self):
260         # we've removed the 4GiB share size limit (see ticket #346 for
261         # details), but still have an 8-byte field, so the limit is now
262         # 2**64, so make sure we reject files larger than that.
263         k = 3; happy = 7; n = 10
264         self.set_encoding_parameters(k, happy, n)
265         big = k*(2**64)
266         data1 = GiganticUploadable(big)
267         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
268                             "This file is too large to be uploaded (data_size)",
269                             self.u.upload, data1)
270         data2 = GiganticUploadable(big-3)
271         d.addCallback(lambda res:
272                       self.shouldFail(FileTooLargeError,
273                                       "test_too_large-data2",
274                                       "This file is too large to be uploaded (offsets)",
275                                       self.u.upload, data2))
276         # I don't know where the actual limit is.. it depends upon how large
277         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
278         return d
279
280     def test_data_zero(self):
281         data = self.get_data(SIZE_ZERO)
282         d = upload_data(self.u, data)
283         d.addCallback(extract_uri)
284         d.addCallback(self._check_small, SIZE_ZERO)
285         return d
286
287     def test_data_small(self):
288         data = self.get_data(SIZE_SMALL)
289         d = upload_data(self.u, data)
290         d.addCallback(extract_uri)
291         d.addCallback(self._check_small, SIZE_SMALL)
292         return d
293
294     def test_data_large(self):
295         data = self.get_data(SIZE_LARGE)
296         d = upload_data(self.u, data)
297         d.addCallback(extract_uri)
298         d.addCallback(self._check_large, SIZE_LARGE)
299         return d
300
301     def test_data_large_odd_segments(self):
302         data = self.get_data(SIZE_LARGE)
303         segsize = int(SIZE_LARGE / 2.5)
304         # we want 3 segments, since that's not a power of two
305         self.set_encoding_parameters(25, 75, 100, segsize)
306         d = upload_data(self.u, data)
307         d.addCallback(extract_uri)
308         d.addCallback(self._check_large, SIZE_LARGE)
309         return d
310
311     def test_filehandle_zero(self):
312         data = self.get_data(SIZE_ZERO)
313         d = upload_filehandle(self.u, StringIO(data))
314         d.addCallback(extract_uri)
315         d.addCallback(self._check_small, SIZE_ZERO)
316         return d
317
318     def test_filehandle_small(self):
319         data = self.get_data(SIZE_SMALL)
320         d = upload_filehandle(self.u, StringIO(data))
321         d.addCallback(extract_uri)
322         d.addCallback(self._check_small, SIZE_SMALL)
323         return d
324
325     def test_filehandle_large(self):
326         data = self.get_data(SIZE_LARGE)
327         d = upload_filehandle(self.u, StringIO(data))
328         d.addCallback(extract_uri)
329         d.addCallback(self._check_large, SIZE_LARGE)
330         return d
331
332     def test_filename_zero(self):
333         fn = "Uploader-test_filename_zero.data"
334         f = open(fn, "wb")
335         data = self.get_data(SIZE_ZERO)
336         f.write(data)
337         f.close()
338         d = upload_filename(self.u, fn)
339         d.addCallback(extract_uri)
340         d.addCallback(self._check_small, SIZE_ZERO)
341         return d
342
343     def test_filename_small(self):
344         fn = "Uploader-test_filename_small.data"
345         f = open(fn, "wb")
346         data = self.get_data(SIZE_SMALL)
347         f.write(data)
348         f.close()
349         d = upload_filename(self.u, fn)
350         d.addCallback(extract_uri)
351         d.addCallback(self._check_small, SIZE_SMALL)
352         return d
353
354     def test_filename_large(self):
355         fn = "Uploader-test_filename_large.data"
356         f = open(fn, "wb")
357         data = self.get_data(SIZE_LARGE)
358         f.write(data)
359         f.close()
360         d = upload_filename(self.u, fn)
361         d.addCallback(extract_uri)
362         d.addCallback(self._check_large, SIZE_LARGE)
363         return d
364
365 class FullServer(unittest.TestCase):
366     def setUp(self):
367         self.node = FakeClient(mode="full")
368         self.u = upload.Uploader()
369         self.u.running = True
370         self.u.parent = self.node
371
372     def _should_fail(self, f):
373         self.failUnless(isinstance(f, Failure) and f.check(NotEnoughSharesError), f)
374
375     def test_data_large(self):
376         data = DATA
377         d = upload_data(self.u, data)
378         d.addBoth(self._should_fail)
379         return d
380
381 class PeerSelection(unittest.TestCase):
382
383     def make_client(self, num_servers=50):
384         self.node = FakeClient(mode="good", num_servers=num_servers)
385         self.u = upload.Uploader()
386         self.u.running = True
387         self.u.parent = self.node
388
389     def get_data(self, size):
390         return DATA[:size]
391
392     def _check_large(self, newuri, size):
393         u = IFileURI(newuri)
394         self.failUnless(isinstance(u, uri.CHKFileURI))
395         self.failUnless(isinstance(u.storage_index, str))
396         self.failUnlessEqual(len(u.storage_index), 16)
397         self.failUnless(isinstance(u.key, str))
398         self.failUnlessEqual(len(u.key), 16)
399         self.failUnlessEqual(u.size, size)
400
401     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
402         p = {"k": k,
403              "happy": happy,
404              "n": n,
405              "max_segment_size": max_segsize,
406              }
407         self.node.DEFAULT_ENCODING_PARAMETERS = p
408
409     def test_one_each(self):
410         # if we have 50 shares, and there are 50 peers, and they all accept a
411         # share, we should get exactly one share per peer
412
413         self.make_client()
414         data = self.get_data(SIZE_LARGE)
415         self.set_encoding_parameters(25, 30, 50)
416         d = upload_data(self.u, data)
417         d.addCallback(extract_uri)
418         d.addCallback(self._check_large, SIZE_LARGE)
419         def _check(res):
420             for p in self.node.last_peers:
421                 allocated = p.allocated
422                 self.failUnlessEqual(len(allocated), 1)
423                 self.failUnlessEqual(p.queries, 1)
424         d.addCallback(_check)
425         return d
426
427     def test_two_each(self):
428         # if we have 100 shares, and there are 50 peers, and they all accept
429         # all shares, we should get exactly two shares per peer
430
431         self.make_client()
432         data = self.get_data(SIZE_LARGE)
433         self.set_encoding_parameters(50, 75, 100)
434         d = upload_data(self.u, data)
435         d.addCallback(extract_uri)
436         d.addCallback(self._check_large, SIZE_LARGE)
437         def _check(res):
438             for p in self.node.last_peers:
439                 allocated = p.allocated
440                 self.failUnlessEqual(len(allocated), 2)
441                 self.failUnlessEqual(p.queries, 2)
442         d.addCallback(_check)
443         return d
444
445     def test_one_each_plus_one_extra(self):
446         # if we have 51 shares, and there are 50 peers, then one peer gets
447         # two shares and the rest get just one
448
449         self.make_client()
450         data = self.get_data(SIZE_LARGE)
451         self.set_encoding_parameters(24, 41, 51)
452         d = upload_data(self.u, data)
453         d.addCallback(extract_uri)
454         d.addCallback(self._check_large, SIZE_LARGE)
455         def _check(res):
456             got_one = []
457             got_two = []
458             for p in self.node.last_peers:
459                 allocated = p.allocated
460                 self.failUnless(len(allocated) in (1,2), len(allocated))
461                 if len(allocated) == 1:
462                     self.failUnlessEqual(p.queries, 1)
463                     got_one.append(p)
464                 else:
465                     self.failUnlessEqual(p.queries, 2)
466                     got_two.append(p)
467             self.failUnlessEqual(len(got_one), 49)
468             self.failUnlessEqual(len(got_two), 1)
469         d.addCallback(_check)
470         return d
471
472     def test_four_each(self):
473         # if we have 200 shares, and there are 50 peers, then each peer gets
474         # 4 shares. The design goal is to accomplish this with only two
475         # queries per peer.
476
477         self.make_client()
478         data = self.get_data(SIZE_LARGE)
479         self.set_encoding_parameters(100, 150, 200)
480         d = upload_data(self.u, data)
481         d.addCallback(extract_uri)
482         d.addCallback(self._check_large, SIZE_LARGE)
483         def _check(res):
484             for p in self.node.last_peers:
485                 allocated = p.allocated
486                 self.failUnlessEqual(len(allocated), 4)
487                 self.failUnlessEqual(p.queries, 2)
488         d.addCallback(_check)
489         return d
490
491     def test_three_of_ten(self):
492         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
493         # 4+4+2
494
495         self.make_client(3)
496         data = self.get_data(SIZE_LARGE)
497         self.set_encoding_parameters(3, 5, 10)
498         d = upload_data(self.u, data)
499         d.addCallback(extract_uri)
500         d.addCallback(self._check_large, SIZE_LARGE)
501         def _check(res):
502             counts = {}
503             for p in self.node.last_peers:
504                 allocated = p.allocated
505                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
506             histogram = [counts.get(i, 0) for i in range(5)]
507             self.failUnlessEqual(histogram, [0,0,0,2,1])
508         d.addCallback(_check)
509         return d
510
511     def test_some_big_some_small(self):
512         # 10 shares, 20 servers, but half the servers don't support a
513         # share-size large enough for our file
514         self.node = FakeClient(mode="some_big_some_small", num_servers=20)
515         self.u = upload.Uploader()
516         self.u.running = True
517         self.u.parent = self.node
518
519         data = self.get_data(SIZE_LARGE)
520         self.set_encoding_parameters(3, 5, 10)
521         d = upload_data(self.u, data)
522         d.addCallback(extract_uri)
523         d.addCallback(self._check_large, SIZE_LARGE)
524         def _check(res):
525             # we should have put one share each on the big peers, and zero
526             # shares on the small peers
527             total_allocated = 0
528             for p in self.node.last_peers:
529                 if p.mode == "good":
530                     self.failUnlessEqual(len(p.allocated), 1)
531                 elif p.mode == "small":
532                     self.failUnlessEqual(len(p.allocated), 0)
533                 total_allocated += len(p.allocated)
534             self.failUnlessEqual(total_allocated, 10)
535         d.addCallback(_check)
536         return d
537
538
539 class StorageIndex(unittest.TestCase):
540     def test_params_must_matter(self):
541         DATA = "I am some data"
542         u = upload.Data(DATA, convergence="")
543         eu = upload.EncryptAnUploadable(u)
544         d1 = eu.get_storage_index()
545
546         # CHK means the same data should encrypt the same way
547         u = upload.Data(DATA, convergence="")
548         eu = upload.EncryptAnUploadable(u)
549         d1a = eu.get_storage_index()
550
551         # but if we use a different convergence string it should be different
552         u = upload.Data(DATA, convergence="wheee!")
553         eu = upload.EncryptAnUploadable(u)
554         d1salt1 = eu.get_storage_index()
555
556         # and if we add yet a different convergence it should be different again
557         u = upload.Data(DATA, convergence="NOT wheee!")
558         eu = upload.EncryptAnUploadable(u)
559         d1salt2 = eu.get_storage_index()
560
561         # and if we use the first string again it should be the same as last time
562         u = upload.Data(DATA, convergence="wheee!")
563         eu = upload.EncryptAnUploadable(u)
564         d1salt1a = eu.get_storage_index()
565
566         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
567         u = upload.Data(DATA, convergence="")
568         u.encoding_param_k = u.default_encoding_param_k + 1
569         eu = upload.EncryptAnUploadable(u)
570         d2 = eu.get_storage_index()
571
572         # and if we use a random key, it should be different than the CHK
573         u = upload.Data(DATA, convergence=None)
574         eu = upload.EncryptAnUploadable(u)
575         d3 = eu.get_storage_index()
576         # and different from another instance
577         u = upload.Data(DATA, convergence=None)
578         eu = upload.EncryptAnUploadable(u)
579         d4 = eu.get_storage_index()
580
581         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
582         def _done(res):
583             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
584             self.failUnlessEqual(si1, si1a)
585             self.failIfEqual(si1, si2)
586             self.failIfEqual(si1, si3)
587             self.failIfEqual(si1, si4)
588             self.failIfEqual(si3, si4)
589             self.failIfEqual(si1salt1, si1)
590             self.failIfEqual(si1salt1, si1salt2)
591             self.failIfEqual(si1salt2, si1)
592             self.failUnlessEqual(si1salt1, si1salt1a)
593         d.addCallback(_done)
594         return d
595
596 class EncodingParameters(SystemTestMixin, unittest.TestCase):
597     def test_configure_parameters(self):
598         self.basedir = self.mktemp()
599         DATA = "data" * 100
600         u = upload.Data(DATA, convergence="")
601         d = self.set_up_nodes()
602         d.addCallback(lambda res: self.clients[0].upload(u))
603         d.addCallback(lambda ur: self.clients[0].create_node_from_uri(ur.uri))
604         m = monitor.Monitor()
605         d.addCallback(lambda fn: fn.check(m))
606         def _check(cr):
607             data = cr.get_data()
608             self.failUnlessEqual(data["count-shares-needed"], 7)
609             self.failUnlessEqual(data["count-shares-expected"], 12)
610         d.addCallback(_check)
611         return d
612
613     def _set_up_nodes_extra_config(self):
614         f = open(os.path.join(self.getdir("client0"), "tahoe.cfg"), "wt")
615         f.write("\n")
616         f.write("[client]\n")
617         f.write("shares.needed = 7\n")
618         f.write("shares.total = 12\n")
619         f.write("\n")
620         f.close()
621
622 # TODO:
623 #  upload with exactly 75 peers (shares_of_happiness)
624 #  have a download fail
625 #  cancel a download (need to implement more cancel stuff)