]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
tests: significantly increase timeouts that triggered on Zandr's ARM box
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1
2 import os
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.python.failure import Failure
6 from twisted.python import log
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
9
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor
12 from allmydata.immutable import upload
13 from allmydata.interfaces import IFileURI, FileTooLargeError, NotEnoughSharesError
14 from allmydata.util.assertutil import precondition
15 from allmydata.util.deferredutil import DeferredListShouldSucceed
16 from no_network import GridTestMixin
17 from common_util import ShouldFailMixin
18 from allmydata.storage_client import StorageFarmBroker
19
20 MiB = 1024*1024
21
22 def extract_uri(results):
23     return results.uri
24
25 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
26 # have been due to an earlier test ERROR'ing out due to timeout, which seems
27 # to screw up subsequent tests.
28 timeout = 960
29
30 class Uploadable(unittest.TestCase):
31     def shouldEqual(self, data, expected):
32         self.failUnless(isinstance(data, list))
33         for e in data:
34             self.failUnless(isinstance(e, str))
35         s = "".join(data)
36         self.failUnlessEqual(s, expected)
37
38     def test_filehandle_random_key(self):
39         return self._test_filehandle(convergence=None)
40
41     def test_filehandle_convergent_encryption(self):
42         return self._test_filehandle(convergence="some convergence string")
43
44     def _test_filehandle(self, convergence):
45         s = StringIO("a"*41)
46         u = upload.FileHandle(s, convergence=convergence)
47         d = u.get_size()
48         d.addCallback(self.failUnlessEqual, 41)
49         d.addCallback(lambda res: u.read(1))
50         d.addCallback(self.shouldEqual, "a")
51         d.addCallback(lambda res: u.read(80))
52         d.addCallback(self.shouldEqual, "a"*40)
53         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
54         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
55         return d
56
57     def test_filename(self):
58         basedir = "upload/Uploadable/test_filename"
59         os.makedirs(basedir)
60         fn = os.path.join(basedir, "file")
61         f = open(fn, "w")
62         f.write("a"*41)
63         f.close()
64         u = upload.FileName(fn, convergence=None)
65         d = u.get_size()
66         d.addCallback(self.failUnlessEqual, 41)
67         d.addCallback(lambda res: u.read(1))
68         d.addCallback(self.shouldEqual, "a")
69         d.addCallback(lambda res: u.read(80))
70         d.addCallback(self.shouldEqual, "a"*40)
71         d.addCallback(lambda res: u.close())
72         return d
73
74     def test_data(self):
75         s = "a"*41
76         u = upload.Data(s, convergence=None)
77         d = u.get_size()
78         d.addCallback(self.failUnlessEqual, 41)
79         d.addCallback(lambda res: u.read(1))
80         d.addCallback(self.shouldEqual, "a")
81         d.addCallback(lambda res: u.read(80))
82         d.addCallback(self.shouldEqual, "a"*40)
83         d.addCallback(lambda res: u.close())
84         return d
85
86 class FakeStorageServer:
87     def __init__(self, mode):
88         self.mode = mode
89         self.allocated = []
90         self.queries = 0
91         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
92                          { "maximum-immutable-share-size": 2**32 },
93                          "application-version": str(allmydata.__full_version__),
94                          }
95         if mode == "small":
96             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
97                              { "maximum-immutable-share-size": 10 },
98                              "application-version": str(allmydata.__full_version__),
99                              }
100
101
102     def callRemote(self, methname, *args, **kwargs):
103         def _call():
104             meth = getattr(self, methname)
105             return meth(*args, **kwargs)
106         d = fireEventually()
107         d.addCallback(lambda res: _call())
108         return d
109
110     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
111                          sharenums, share_size, canary):
112         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
113         self.queries += 1
114         if self.mode == "full":
115             return (set(), {},)
116         elif self.mode == "already got them":
117             return (set(sharenums), {},)
118         else:
119             for shnum in sharenums:
120                 self.allocated.append( (storage_index, shnum) )
121             return (set(),
122                     dict([( shnum, FakeBucketWriter(share_size) )
123                           for shnum in sharenums]),
124                     )
125
126 class FakeBucketWriter:
127     # a diagnostic version of storageserver.BucketWriter
128     def __init__(self, size):
129         self.data = StringIO()
130         self.closed = False
131         self._size = size
132
133     def callRemote(self, methname, *args, **kwargs):
134         def _call():
135             meth = getattr(self, "remote_" + methname)
136             return meth(*args, **kwargs)
137         d = fireEventually()
138         d.addCallback(lambda res: _call())
139         return d
140
141     def remote_write(self, offset, data):
142         precondition(not self.closed)
143         precondition(offset >= 0)
144         precondition(offset+len(data) <= self._size,
145                      "offset=%d + data=%d > size=%d" %
146                      (offset, len(data), self._size))
147         self.data.seek(offset)
148         self.data.write(data)
149
150     def remote_close(self):
151         precondition(not self.closed)
152         self.closed = True
153
154     def remote_abort(self):
155         log.err("uh oh, I was asked to abort")
156
157 class FakeClient:
158     DEFAULT_ENCODING_PARAMETERS = {"k":25,
159                                    "happy": 75,
160                                    "n": 100,
161                                    "max_segment_size": 1*MiB,
162                                    }
163     def __init__(self, mode="good", num_servers=50):
164         self.mode = mode
165         self.num_servers = num_servers
166         if mode == "some_big_some_small":
167             peers = []
168             for fakeid in range(num_servers):
169                 if fakeid % 2:
170                     peers.append(("%20d" % fakeid, FakeStorageServer("good")))
171                 else:
172                     peers.append(("%20d" % fakeid, FakeStorageServer("small")))
173         else:
174             peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
175                            for fakeid in range(self.num_servers) ]
176         self.storage_broker = StorageFarmBroker()
177         for (serverid, server) in peers:
178             self.storage_broker.add_server(serverid, server)
179         self.last_peers = [p[1] for p in peers]
180
181     def log(self, *args, **kwargs):
182         pass
183     def get_encoding_parameters(self):
184         return self.DEFAULT_ENCODING_PARAMETERS
185     def get_storage_broker(self):
186         return self.storage_broker
187
188     def get_renewal_secret(self):
189         return ""
190     def get_cancel_secret(self):
191         return ""
192
193 class GotTooFarError(Exception):
194     pass
195
196 class GiganticUploadable(upload.FileHandle):
197     def __init__(self, size):
198         self._size = size
199         self._fp = 0
200
201     def get_encryption_key(self):
202         return defer.succeed("\x00" * 16)
203     def get_size(self):
204         return defer.succeed(self._size)
205     def read(self, length):
206         left = self._size - self._fp
207         length = min(left, length)
208         self._fp += length
209         if self._fp > 1000000:
210             # terminate the test early.
211             raise GotTooFarError("we shouldn't be allowed to get this far")
212         return defer.succeed(["\x00" * length])
213     def close(self):
214         pass
215
216 DATA = """
217 Once upon a time, there was a beautiful princess named Buttercup. She lived
218 in a magical land where every file was stored securely among millions of
219 machines, and nobody ever worried about their data being lost ever again.
220 The End.
221 """
222 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
223
224 SIZE_ZERO = 0
225 SIZE_SMALL = 16
226 SIZE_LARGE = len(DATA)
227
228 def upload_data(uploader, data):
229     u = upload.Data(data, convergence=None)
230     return uploader.upload(u)
231 def upload_filename(uploader, filename):
232     u = upload.FileName(filename, convergence=None)
233     return uploader.upload(u)
234 def upload_filehandle(uploader, fh):
235     u = upload.FileHandle(fh, convergence=None)
236     return uploader.upload(u)
237
238 class GoodServer(unittest.TestCase, ShouldFailMixin):
239     def setUp(self):
240         self.node = FakeClient(mode="good")
241         self.u = upload.Uploader()
242         self.u.running = True
243         self.u.parent = self.node
244
245     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
246         p = {"k": k,
247              "happy": happy,
248              "n": n,
249              "max_segment_size": max_segsize,
250              }
251         self.node.DEFAULT_ENCODING_PARAMETERS = p
252
253     def _check_small(self, newuri, size):
254         u = IFileURI(newuri)
255         self.failUnless(isinstance(u, uri.LiteralFileURI))
256         self.failUnlessEqual(len(u.data), size)
257
258     def _check_large(self, newuri, size):
259         u = IFileURI(newuri)
260         self.failUnless(isinstance(u, uri.CHKFileURI))
261         self.failUnless(isinstance(u.storage_index, str))
262         self.failUnlessEqual(len(u.storage_index), 16)
263         self.failUnless(isinstance(u.key, str))
264         self.failUnlessEqual(len(u.key), 16)
265         self.failUnlessEqual(u.size, size)
266
267     def get_data(self, size):
268         return DATA[:size]
269
270     def test_too_large(self):
271         # we've removed the 4GiB share size limit (see ticket #346 for
272         # details), but still have an 8-byte field, so the limit is now
273         # 2**64, so make sure we reject files larger than that.
274         k = 3; happy = 7; n = 10
275         self.set_encoding_parameters(k, happy, n)
276         big = k*(2**64)
277         data1 = GiganticUploadable(big)
278         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
279                             "This file is too large to be uploaded (data_size)",
280                             self.u.upload, data1)
281         data2 = GiganticUploadable(big-3)
282         d.addCallback(lambda res:
283                       self.shouldFail(FileTooLargeError,
284                                       "test_too_large-data2",
285                                       "This file is too large to be uploaded (offsets)",
286                                       self.u.upload, data2))
287         # I don't know where the actual limit is.. it depends upon how large
288         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
289         return d
290
291     def test_data_zero(self):
292         data = self.get_data(SIZE_ZERO)
293         d = upload_data(self.u, data)
294         d.addCallback(extract_uri)
295         d.addCallback(self._check_small, SIZE_ZERO)
296         return d
297
298     def test_data_small(self):
299         data = self.get_data(SIZE_SMALL)
300         d = upload_data(self.u, data)
301         d.addCallback(extract_uri)
302         d.addCallback(self._check_small, SIZE_SMALL)
303         return d
304
305     def test_data_large(self):
306         data = self.get_data(SIZE_LARGE)
307         d = upload_data(self.u, data)
308         d.addCallback(extract_uri)
309         d.addCallback(self._check_large, SIZE_LARGE)
310         return d
311
312     def test_data_large_odd_segments(self):
313         data = self.get_data(SIZE_LARGE)
314         segsize = int(SIZE_LARGE / 2.5)
315         # we want 3 segments, since that's not a power of two
316         self.set_encoding_parameters(25, 75, 100, segsize)
317         d = upload_data(self.u, data)
318         d.addCallback(extract_uri)
319         d.addCallback(self._check_large, SIZE_LARGE)
320         return d
321
322     def test_filehandle_zero(self):
323         data = self.get_data(SIZE_ZERO)
324         d = upload_filehandle(self.u, StringIO(data))
325         d.addCallback(extract_uri)
326         d.addCallback(self._check_small, SIZE_ZERO)
327         return d
328
329     def test_filehandle_small(self):
330         data = self.get_data(SIZE_SMALL)
331         d = upload_filehandle(self.u, StringIO(data))
332         d.addCallback(extract_uri)
333         d.addCallback(self._check_small, SIZE_SMALL)
334         return d
335
336     def test_filehandle_large(self):
337         data = self.get_data(SIZE_LARGE)
338         d = upload_filehandle(self.u, StringIO(data))
339         d.addCallback(extract_uri)
340         d.addCallback(self._check_large, SIZE_LARGE)
341         return d
342
343     def test_filename_zero(self):
344         fn = "Uploader-test_filename_zero.data"
345         f = open(fn, "wb")
346         data = self.get_data(SIZE_ZERO)
347         f.write(data)
348         f.close()
349         d = upload_filename(self.u, fn)
350         d.addCallback(extract_uri)
351         d.addCallback(self._check_small, SIZE_ZERO)
352         return d
353
354     def test_filename_small(self):
355         fn = "Uploader-test_filename_small.data"
356         f = open(fn, "wb")
357         data = self.get_data(SIZE_SMALL)
358         f.write(data)
359         f.close()
360         d = upload_filename(self.u, fn)
361         d.addCallback(extract_uri)
362         d.addCallback(self._check_small, SIZE_SMALL)
363         return d
364
365     def test_filename_large(self):
366         fn = "Uploader-test_filename_large.data"
367         f = open(fn, "wb")
368         data = self.get_data(SIZE_LARGE)
369         f.write(data)
370         f.close()
371         d = upload_filename(self.u, fn)
372         d.addCallback(extract_uri)
373         d.addCallback(self._check_large, SIZE_LARGE)
374         return d
375
376 class FullServer(unittest.TestCase):
377     def setUp(self):
378         self.node = FakeClient(mode="full")
379         self.u = upload.Uploader()
380         self.u.running = True
381         self.u.parent = self.node
382
383     def _should_fail(self, f):
384         self.failUnless(isinstance(f, Failure) and f.check(NotEnoughSharesError), f)
385
386     def test_data_large(self):
387         data = DATA
388         d = upload_data(self.u, data)
389         d.addBoth(self._should_fail)
390         return d
391
392 class PeerSelection(unittest.TestCase):
393
394     def make_client(self, num_servers=50):
395         self.node = FakeClient(mode="good", num_servers=num_servers)
396         self.u = upload.Uploader()
397         self.u.running = True
398         self.u.parent = self.node
399
400     def get_data(self, size):
401         return DATA[:size]
402
403     def _check_large(self, newuri, size):
404         u = IFileURI(newuri)
405         self.failUnless(isinstance(u, uri.CHKFileURI))
406         self.failUnless(isinstance(u.storage_index, str))
407         self.failUnlessEqual(len(u.storage_index), 16)
408         self.failUnless(isinstance(u.key, str))
409         self.failUnlessEqual(len(u.key), 16)
410         self.failUnlessEqual(u.size, size)
411
412     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
413         p = {"k": k,
414              "happy": happy,
415              "n": n,
416              "max_segment_size": max_segsize,
417              }
418         self.node.DEFAULT_ENCODING_PARAMETERS = p
419
420     def test_one_each(self):
421         # if we have 50 shares, and there are 50 peers, and they all accept a
422         # share, we should get exactly one share per peer
423
424         self.make_client()
425         data = self.get_data(SIZE_LARGE)
426         self.set_encoding_parameters(25, 30, 50)
427         d = upload_data(self.u, data)
428         d.addCallback(extract_uri)
429         d.addCallback(self._check_large, SIZE_LARGE)
430         def _check(res):
431             for p in self.node.last_peers:
432                 allocated = p.allocated
433                 self.failUnlessEqual(len(allocated), 1)
434                 self.failUnlessEqual(p.queries, 1)
435         d.addCallback(_check)
436         return d
437
438     def test_two_each(self):
439         # if we have 100 shares, and there are 50 peers, and they all accept
440         # all shares, we should get exactly two shares per peer
441
442         self.make_client()
443         data = self.get_data(SIZE_LARGE)
444         self.set_encoding_parameters(50, 75, 100)
445         d = upload_data(self.u, data)
446         d.addCallback(extract_uri)
447         d.addCallback(self._check_large, SIZE_LARGE)
448         def _check(res):
449             for p in self.node.last_peers:
450                 allocated = p.allocated
451                 self.failUnlessEqual(len(allocated), 2)
452                 self.failUnlessEqual(p.queries, 2)
453         d.addCallback(_check)
454         return d
455
456     def test_one_each_plus_one_extra(self):
457         # if we have 51 shares, and there are 50 peers, then one peer gets
458         # two shares and the rest get just one
459
460         self.make_client()
461         data = self.get_data(SIZE_LARGE)
462         self.set_encoding_parameters(24, 41, 51)
463         d = upload_data(self.u, data)
464         d.addCallback(extract_uri)
465         d.addCallback(self._check_large, SIZE_LARGE)
466         def _check(res):
467             got_one = []
468             got_two = []
469             for p in self.node.last_peers:
470                 allocated = p.allocated
471                 self.failUnless(len(allocated) in (1,2), len(allocated))
472                 if len(allocated) == 1:
473                     self.failUnlessEqual(p.queries, 1)
474                     got_one.append(p)
475                 else:
476                     self.failUnlessEqual(p.queries, 2)
477                     got_two.append(p)
478             self.failUnlessEqual(len(got_one), 49)
479             self.failUnlessEqual(len(got_two), 1)
480         d.addCallback(_check)
481         return d
482
483     def test_four_each(self):
484         # if we have 200 shares, and there are 50 peers, then each peer gets
485         # 4 shares. The design goal is to accomplish this with only two
486         # queries per peer.
487
488         self.make_client()
489         data = self.get_data(SIZE_LARGE)
490         self.set_encoding_parameters(100, 150, 200)
491         d = upload_data(self.u, data)
492         d.addCallback(extract_uri)
493         d.addCallback(self._check_large, SIZE_LARGE)
494         def _check(res):
495             for p in self.node.last_peers:
496                 allocated = p.allocated
497                 self.failUnlessEqual(len(allocated), 4)
498                 self.failUnlessEqual(p.queries, 2)
499         d.addCallback(_check)
500         return d
501
502     def test_three_of_ten(self):
503         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
504         # 4+4+2
505
506         self.make_client(3)
507         data = self.get_data(SIZE_LARGE)
508         self.set_encoding_parameters(3, 5, 10)
509         d = upload_data(self.u, data)
510         d.addCallback(extract_uri)
511         d.addCallback(self._check_large, SIZE_LARGE)
512         def _check(res):
513             counts = {}
514             for p in self.node.last_peers:
515                 allocated = p.allocated
516                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
517             histogram = [counts.get(i, 0) for i in range(5)]
518             self.failUnlessEqual(histogram, [0,0,0,2,1])
519         d.addCallback(_check)
520         return d
521
522     def test_some_big_some_small(self):
523         # 10 shares, 20 servers, but half the servers don't support a
524         # share-size large enough for our file
525         self.node = FakeClient(mode="some_big_some_small", num_servers=20)
526         self.u = upload.Uploader()
527         self.u.running = True
528         self.u.parent = self.node
529
530         data = self.get_data(SIZE_LARGE)
531         self.set_encoding_parameters(3, 5, 10)
532         d = upload_data(self.u, data)
533         d.addCallback(extract_uri)
534         d.addCallback(self._check_large, SIZE_LARGE)
535         def _check(res):
536             # we should have put one share each on the big peers, and zero
537             # shares on the small peers
538             total_allocated = 0
539             for p in self.node.last_peers:
540                 if p.mode == "good":
541                     self.failUnlessEqual(len(p.allocated), 1)
542                 elif p.mode == "small":
543                     self.failUnlessEqual(len(p.allocated), 0)
544                 total_allocated += len(p.allocated)
545             self.failUnlessEqual(total_allocated, 10)
546         d.addCallback(_check)
547         return d
548
549
550 class StorageIndex(unittest.TestCase):
551     def test_params_must_matter(self):
552         DATA = "I am some data"
553         u = upload.Data(DATA, convergence="")
554         eu = upload.EncryptAnUploadable(u)
555         d1 = eu.get_storage_index()
556
557         # CHK means the same data should encrypt the same way
558         u = upload.Data(DATA, convergence="")
559         eu = upload.EncryptAnUploadable(u)
560         d1a = eu.get_storage_index()
561
562         # but if we use a different convergence string it should be different
563         u = upload.Data(DATA, convergence="wheee!")
564         eu = upload.EncryptAnUploadable(u)
565         d1salt1 = eu.get_storage_index()
566
567         # and if we add yet a different convergence it should be different again
568         u = upload.Data(DATA, convergence="NOT wheee!")
569         eu = upload.EncryptAnUploadable(u)
570         d1salt2 = eu.get_storage_index()
571
572         # and if we use the first string again it should be the same as last time
573         u = upload.Data(DATA, convergence="wheee!")
574         eu = upload.EncryptAnUploadable(u)
575         d1salt1a = eu.get_storage_index()
576
577         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
578         u = upload.Data(DATA, convergence="")
579         u.encoding_param_k = u.default_encoding_param_k + 1
580         eu = upload.EncryptAnUploadable(u)
581         d2 = eu.get_storage_index()
582
583         # and if we use a random key, it should be different than the CHK
584         u = upload.Data(DATA, convergence=None)
585         eu = upload.EncryptAnUploadable(u)
586         d3 = eu.get_storage_index()
587         # and different from another instance
588         u = upload.Data(DATA, convergence=None)
589         eu = upload.EncryptAnUploadable(u)
590         d4 = eu.get_storage_index()
591
592         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
593         def _done(res):
594             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
595             self.failUnlessEqual(si1, si1a)
596             self.failIfEqual(si1, si2)
597             self.failIfEqual(si1, si3)
598             self.failIfEqual(si1, si4)
599             self.failIfEqual(si3, si4)
600             self.failIfEqual(si1salt1, si1)
601             self.failIfEqual(si1salt1, si1salt2)
602             self.failIfEqual(si1salt2, si1)
603             self.failUnlessEqual(si1salt1, si1salt1a)
604         d.addCallback(_done)
605         return d
606
607 class EncodingParameters(GridTestMixin, unittest.TestCase):
608     def test_configure_parameters(self):
609         self.basedir = self.mktemp()
610         hooks = {0: self._set_up_nodes_extra_config}
611         self.set_up_grid(client_config_hooks=hooks)
612         c0 = self.g.clients[0]
613
614         DATA = "data" * 100
615         u = upload.Data(DATA, convergence="")
616         d = c0.upload(u)
617         d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
618         m = monitor.Monitor()
619         d.addCallback(lambda fn: fn.check(m))
620         def _check(cr):
621             data = cr.get_data()
622             self.failUnlessEqual(data["count-shares-needed"], 7)
623             self.failUnlessEqual(data["count-shares-expected"], 12)
624         d.addCallback(_check)
625         return d
626
627     def _set_up_nodes_extra_config(self, clientdir):
628         cfgfn = os.path.join(clientdir, "tahoe.cfg")
629         oldcfg = open(cfgfn, "r").read()
630         f = open(cfgfn, "wt")
631         f.write(oldcfg)
632         f.write("\n")
633         f.write("[client]\n")
634         f.write("shares.needed = 7\n")
635         f.write("shares.total = 12\n")
636         f.write("\n")
637         f.close()
638         return None
639
640 # TODO:
641 #  upload with exactly 75 peers (shares_of_happiness)
642 #  have a download fail
643 #  cancel a download (need to implement more cancel stuff)