]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
4d0f764f835ffa7d2096ae4466b7baed776e04a7
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1
2 import os
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.python.failure import Failure
6 from twisted.python import log
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
9
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor
12 from allmydata.immutable import upload
13 from allmydata.interfaces import IFileURI, FileTooLargeError, NotEnoughSharesError
14 from allmydata.util.assertutil import precondition
15 from allmydata.util.deferredutil import DeferredListShouldSucceed
16 from no_network import GridTestMixin
17 from common_util import ShouldFailMixin
18 from allmydata.storage_client import StorageFarmBroker
19
20 MiB = 1024*1024
21
22 def extract_uri(results):
23     return results.uri
24
25 class Uploadable(unittest.TestCase):
26     def shouldEqual(self, data, expected):
27         self.failUnless(isinstance(data, list))
28         for e in data:
29             self.failUnless(isinstance(e, str))
30         s = "".join(data)
31         self.failUnlessEqual(s, expected)
32
33     def test_filehandle_random_key(self):
34         return self._test_filehandle(convergence=None)
35
36     def test_filehandle_convergent_encryption(self):
37         return self._test_filehandle(convergence="some convergence string")
38
39     def _test_filehandle(self, convergence):
40         s = StringIO("a"*41)
41         u = upload.FileHandle(s, convergence=convergence)
42         d = u.get_size()
43         d.addCallback(self.failUnlessEqual, 41)
44         d.addCallback(lambda res: u.read(1))
45         d.addCallback(self.shouldEqual, "a")
46         d.addCallback(lambda res: u.read(80))
47         d.addCallback(self.shouldEqual, "a"*40)
48         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
49         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
50         return d
51
52     def test_filename(self):
53         basedir = "upload/Uploadable/test_filename"
54         os.makedirs(basedir)
55         fn = os.path.join(basedir, "file")
56         f = open(fn, "w")
57         f.write("a"*41)
58         f.close()
59         u = upload.FileName(fn, convergence=None)
60         d = u.get_size()
61         d.addCallback(self.failUnlessEqual, 41)
62         d.addCallback(lambda res: u.read(1))
63         d.addCallback(self.shouldEqual, "a")
64         d.addCallback(lambda res: u.read(80))
65         d.addCallback(self.shouldEqual, "a"*40)
66         d.addCallback(lambda res: u.close())
67         return d
68
69     def test_data(self):
70         s = "a"*41
71         u = upload.Data(s, convergence=None)
72         d = u.get_size()
73         d.addCallback(self.failUnlessEqual, 41)
74         d.addCallback(lambda res: u.read(1))
75         d.addCallback(self.shouldEqual, "a")
76         d.addCallback(lambda res: u.read(80))
77         d.addCallback(self.shouldEqual, "a"*40)
78         d.addCallback(lambda res: u.close())
79         return d
80
81 class FakeStorageServer:
82     def __init__(self, mode):
83         self.mode = mode
84         self.allocated = []
85         self.queries = 0
86         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
87                          { "maximum-immutable-share-size": 2**32 },
88                          "application-version": str(allmydata.__full_version__),
89                          }
90         if mode == "small":
91             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
92                              { "maximum-immutable-share-size": 10 },
93                              "application-version": str(allmydata.__full_version__),
94                              }
95
96
97     def callRemote(self, methname, *args, **kwargs):
98         def _call():
99             meth = getattr(self, methname)
100             return meth(*args, **kwargs)
101         d = fireEventually()
102         d.addCallback(lambda res: _call())
103         return d
104
105     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
106                          sharenums, share_size, canary):
107         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
108         self.queries += 1
109         if self.mode == "full":
110             return (set(), {},)
111         elif self.mode == "already got them":
112             return (set(sharenums), {},)
113         else:
114             for shnum in sharenums:
115                 self.allocated.append( (storage_index, shnum) )
116             return (set(),
117                     dict([( shnum, FakeBucketWriter(share_size) )
118                           for shnum in sharenums]),
119                     )
120
121 class FakeBucketWriter:
122     # a diagnostic version of storageserver.BucketWriter
123     def __init__(self, size):
124         self.data = StringIO()
125         self.closed = False
126         self._size = size
127
128     def callRemote(self, methname, *args, **kwargs):
129         def _call():
130             meth = getattr(self, "remote_" + methname)
131             return meth(*args, **kwargs)
132         d = fireEventually()
133         d.addCallback(lambda res: _call())
134         return d
135
136     def remote_write(self, offset, data):
137         precondition(not self.closed)
138         precondition(offset >= 0)
139         precondition(offset+len(data) <= self._size,
140                      "offset=%d + data=%d > size=%d" %
141                      (offset, len(data), self._size))
142         self.data.seek(offset)
143         self.data.write(data)
144
145     def remote_close(self):
146         precondition(not self.closed)
147         self.closed = True
148
149     def remote_abort(self):
150         log.err("uh oh, I was asked to abort")
151
152 class FakeClient:
153     DEFAULT_ENCODING_PARAMETERS = {"k":25,
154                                    "happy": 75,
155                                    "n": 100,
156                                    "max_segment_size": 1*MiB,
157                                    }
158     def __init__(self, mode="good", num_servers=50):
159         self.mode = mode
160         self.num_servers = num_servers
161         if mode == "some_big_some_small":
162             peers = []
163             for fakeid in range(num_servers):
164                 if fakeid % 2:
165                     peers.append(("%20d" % fakeid, FakeStorageServer("good")))
166                 else:
167                     peers.append(("%20d" % fakeid, FakeStorageServer("small")))
168         else:
169             peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
170                            for fakeid in range(self.num_servers) ]
171         self.storage_broker = StorageFarmBroker()
172         for (serverid, server) in peers:
173             self.storage_broker.add_server(serverid, server)
174         self.last_peers = [p[1] for p in peers]
175
176     def log(self, *args, **kwargs):
177         pass
178     def get_encoding_parameters(self):
179         return self.DEFAULT_ENCODING_PARAMETERS
180     def get_storage_broker(self):
181         return self.storage_broker
182
183     def get_renewal_secret(self):
184         return ""
185     def get_cancel_secret(self):
186         return ""
187
188 class GotTooFarError(Exception):
189     pass
190
191 class GiganticUploadable(upload.FileHandle):
192     def __init__(self, size):
193         self._size = size
194         self._fp = 0
195
196     def get_encryption_key(self):
197         return defer.succeed("\x00" * 16)
198     def get_size(self):
199         return defer.succeed(self._size)
200     def read(self, length):
201         left = self._size - self._fp
202         length = min(left, length)
203         self._fp += length
204         if self._fp > 1000000:
205             # terminate the test early.
206             raise GotTooFarError("we shouldn't be allowed to get this far")
207         return defer.succeed(["\x00" * length])
208     def close(self):
209         pass
210
211 DATA = """
212 Once upon a time, there was a beautiful princess named Buttercup. She lived
213 in a magical land where every file was stored securely among millions of
214 machines, and nobody ever worried about their data being lost ever again.
215 The End.
216 """
217 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
218
219 SIZE_ZERO = 0
220 SIZE_SMALL = 16
221 SIZE_LARGE = len(DATA)
222
223 def upload_data(uploader, data):
224     u = upload.Data(data, convergence=None)
225     return uploader.upload(u)
226 def upload_filename(uploader, filename):
227     u = upload.FileName(filename, convergence=None)
228     return uploader.upload(u)
229 def upload_filehandle(uploader, fh):
230     u = upload.FileHandle(fh, convergence=None)
231     return uploader.upload(u)
232
233 class GoodServer(unittest.TestCase, ShouldFailMixin):
234     def setUp(self):
235         self.node = FakeClient(mode="good")
236         self.u = upload.Uploader()
237         self.u.running = True
238         self.u.parent = self.node
239
240     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
241         p = {"k": k,
242              "happy": happy,
243              "n": n,
244              "max_segment_size": max_segsize,
245              }
246         self.node.DEFAULT_ENCODING_PARAMETERS = p
247
248     def _check_small(self, newuri, size):
249         u = IFileURI(newuri)
250         self.failUnless(isinstance(u, uri.LiteralFileURI))
251         self.failUnlessEqual(len(u.data), size)
252
253     def _check_large(self, newuri, size):
254         u = IFileURI(newuri)
255         self.failUnless(isinstance(u, uri.CHKFileURI))
256         self.failUnless(isinstance(u.storage_index, str))
257         self.failUnlessEqual(len(u.storage_index), 16)
258         self.failUnless(isinstance(u.key, str))
259         self.failUnlessEqual(len(u.key), 16)
260         self.failUnlessEqual(u.size, size)
261
262     def get_data(self, size):
263         return DATA[:size]
264
265     def test_too_large(self):
266         # we've removed the 4GiB share size limit (see ticket #346 for
267         # details), but still have an 8-byte field, so the limit is now
268         # 2**64, so make sure we reject files larger than that.
269         k = 3; happy = 7; n = 10
270         self.set_encoding_parameters(k, happy, n)
271         big = k*(2**64)
272         data1 = GiganticUploadable(big)
273         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
274                             "This file is too large to be uploaded (data_size)",
275                             self.u.upload, data1)
276         data2 = GiganticUploadable(big-3)
277         d.addCallback(lambda res:
278                       self.shouldFail(FileTooLargeError,
279                                       "test_too_large-data2",
280                                       "This file is too large to be uploaded (offsets)",
281                                       self.u.upload, data2))
282         # I don't know where the actual limit is.. it depends upon how large
283         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
284         return d
285
286     def test_data_zero(self):
287         data = self.get_data(SIZE_ZERO)
288         d = upload_data(self.u, data)
289         d.addCallback(extract_uri)
290         d.addCallback(self._check_small, SIZE_ZERO)
291         return d
292
293     def test_data_small(self):
294         data = self.get_data(SIZE_SMALL)
295         d = upload_data(self.u, data)
296         d.addCallback(extract_uri)
297         d.addCallback(self._check_small, SIZE_SMALL)
298         return d
299
300     def test_data_large(self):
301         data = self.get_data(SIZE_LARGE)
302         d = upload_data(self.u, data)
303         d.addCallback(extract_uri)
304         d.addCallback(self._check_large, SIZE_LARGE)
305         return d
306
307     def test_data_large_odd_segments(self):
308         data = self.get_data(SIZE_LARGE)
309         segsize = int(SIZE_LARGE / 2.5)
310         # we want 3 segments, since that's not a power of two
311         self.set_encoding_parameters(25, 75, 100, segsize)
312         d = upload_data(self.u, data)
313         d.addCallback(extract_uri)
314         d.addCallback(self._check_large, SIZE_LARGE)
315         return d
316
317     def test_filehandle_zero(self):
318         data = self.get_data(SIZE_ZERO)
319         d = upload_filehandle(self.u, StringIO(data))
320         d.addCallback(extract_uri)
321         d.addCallback(self._check_small, SIZE_ZERO)
322         return d
323
324     def test_filehandle_small(self):
325         data = self.get_data(SIZE_SMALL)
326         d = upload_filehandle(self.u, StringIO(data))
327         d.addCallback(extract_uri)
328         d.addCallback(self._check_small, SIZE_SMALL)
329         return d
330
331     def test_filehandle_large(self):
332         data = self.get_data(SIZE_LARGE)
333         d = upload_filehandle(self.u, StringIO(data))
334         d.addCallback(extract_uri)
335         d.addCallback(self._check_large, SIZE_LARGE)
336         return d
337
338     def test_filename_zero(self):
339         fn = "Uploader-test_filename_zero.data"
340         f = open(fn, "wb")
341         data = self.get_data(SIZE_ZERO)
342         f.write(data)
343         f.close()
344         d = upload_filename(self.u, fn)
345         d.addCallback(extract_uri)
346         d.addCallback(self._check_small, SIZE_ZERO)
347         return d
348
349     def test_filename_small(self):
350         fn = "Uploader-test_filename_small.data"
351         f = open(fn, "wb")
352         data = self.get_data(SIZE_SMALL)
353         f.write(data)
354         f.close()
355         d = upload_filename(self.u, fn)
356         d.addCallback(extract_uri)
357         d.addCallback(self._check_small, SIZE_SMALL)
358         return d
359
360     def test_filename_large(self):
361         fn = "Uploader-test_filename_large.data"
362         f = open(fn, "wb")
363         data = self.get_data(SIZE_LARGE)
364         f.write(data)
365         f.close()
366         d = upload_filename(self.u, fn)
367         d.addCallback(extract_uri)
368         d.addCallback(self._check_large, SIZE_LARGE)
369         return d
370
371 class FullServer(unittest.TestCase):
372     def setUp(self):
373         self.node = FakeClient(mode="full")
374         self.u = upload.Uploader()
375         self.u.running = True
376         self.u.parent = self.node
377
378     def _should_fail(self, f):
379         self.failUnless(isinstance(f, Failure) and f.check(NotEnoughSharesError), f)
380
381     def test_data_large(self):
382         data = DATA
383         d = upload_data(self.u, data)
384         d.addBoth(self._should_fail)
385         return d
386
387 class PeerSelection(unittest.TestCase):
388
389     def make_client(self, num_servers=50):
390         self.node = FakeClient(mode="good", num_servers=num_servers)
391         self.u = upload.Uploader()
392         self.u.running = True
393         self.u.parent = self.node
394
395     def get_data(self, size):
396         return DATA[:size]
397
398     def _check_large(self, newuri, size):
399         u = IFileURI(newuri)
400         self.failUnless(isinstance(u, uri.CHKFileURI))
401         self.failUnless(isinstance(u.storage_index, str))
402         self.failUnlessEqual(len(u.storage_index), 16)
403         self.failUnless(isinstance(u.key, str))
404         self.failUnlessEqual(len(u.key), 16)
405         self.failUnlessEqual(u.size, size)
406
407     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
408         p = {"k": k,
409              "happy": happy,
410              "n": n,
411              "max_segment_size": max_segsize,
412              }
413         self.node.DEFAULT_ENCODING_PARAMETERS = p
414
415     def test_one_each(self):
416         # if we have 50 shares, and there are 50 peers, and they all accept a
417         # share, we should get exactly one share per peer
418
419         self.make_client()
420         data = self.get_data(SIZE_LARGE)
421         self.set_encoding_parameters(25, 30, 50)
422         d = upload_data(self.u, data)
423         d.addCallback(extract_uri)
424         d.addCallback(self._check_large, SIZE_LARGE)
425         def _check(res):
426             for p in self.node.last_peers:
427                 allocated = p.allocated
428                 self.failUnlessEqual(len(allocated), 1)
429                 self.failUnlessEqual(p.queries, 1)
430         d.addCallback(_check)
431         return d
432
433     def test_two_each(self):
434         # if we have 100 shares, and there are 50 peers, and they all accept
435         # all shares, we should get exactly two shares per peer
436
437         self.make_client()
438         data = self.get_data(SIZE_LARGE)
439         self.set_encoding_parameters(50, 75, 100)
440         d = upload_data(self.u, data)
441         d.addCallback(extract_uri)
442         d.addCallback(self._check_large, SIZE_LARGE)
443         def _check(res):
444             for p in self.node.last_peers:
445                 allocated = p.allocated
446                 self.failUnlessEqual(len(allocated), 2)
447                 self.failUnlessEqual(p.queries, 2)
448         d.addCallback(_check)
449         return d
450
451     def test_one_each_plus_one_extra(self):
452         # if we have 51 shares, and there are 50 peers, then one peer gets
453         # two shares and the rest get just one
454
455         self.make_client()
456         data = self.get_data(SIZE_LARGE)
457         self.set_encoding_parameters(24, 41, 51)
458         d = upload_data(self.u, data)
459         d.addCallback(extract_uri)
460         d.addCallback(self._check_large, SIZE_LARGE)
461         def _check(res):
462             got_one = []
463             got_two = []
464             for p in self.node.last_peers:
465                 allocated = p.allocated
466                 self.failUnless(len(allocated) in (1,2), len(allocated))
467                 if len(allocated) == 1:
468                     self.failUnlessEqual(p.queries, 1)
469                     got_one.append(p)
470                 else:
471                     self.failUnlessEqual(p.queries, 2)
472                     got_two.append(p)
473             self.failUnlessEqual(len(got_one), 49)
474             self.failUnlessEqual(len(got_two), 1)
475         d.addCallback(_check)
476         return d
477
478     def test_four_each(self):
479         # if we have 200 shares, and there are 50 peers, then each peer gets
480         # 4 shares. The design goal is to accomplish this with only two
481         # queries per peer.
482
483         self.make_client()
484         data = self.get_data(SIZE_LARGE)
485         self.set_encoding_parameters(100, 150, 200)
486         d = upload_data(self.u, data)
487         d.addCallback(extract_uri)
488         d.addCallback(self._check_large, SIZE_LARGE)
489         def _check(res):
490             for p in self.node.last_peers:
491                 allocated = p.allocated
492                 self.failUnlessEqual(len(allocated), 4)
493                 self.failUnlessEqual(p.queries, 2)
494         d.addCallback(_check)
495         return d
496
497     def test_three_of_ten(self):
498         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
499         # 4+4+2
500
501         self.make_client(3)
502         data = self.get_data(SIZE_LARGE)
503         self.set_encoding_parameters(3, 5, 10)
504         d = upload_data(self.u, data)
505         d.addCallback(extract_uri)
506         d.addCallback(self._check_large, SIZE_LARGE)
507         def _check(res):
508             counts = {}
509             for p in self.node.last_peers:
510                 allocated = p.allocated
511                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
512             histogram = [counts.get(i, 0) for i in range(5)]
513             self.failUnlessEqual(histogram, [0,0,0,2,1])
514         d.addCallback(_check)
515         return d
516
517     def test_some_big_some_small(self):
518         # 10 shares, 20 servers, but half the servers don't support a
519         # share-size large enough for our file
520         self.node = FakeClient(mode="some_big_some_small", num_servers=20)
521         self.u = upload.Uploader()
522         self.u.running = True
523         self.u.parent = self.node
524
525         data = self.get_data(SIZE_LARGE)
526         self.set_encoding_parameters(3, 5, 10)
527         d = upload_data(self.u, data)
528         d.addCallback(extract_uri)
529         d.addCallback(self._check_large, SIZE_LARGE)
530         def _check(res):
531             # we should have put one share each on the big peers, and zero
532             # shares on the small peers
533             total_allocated = 0
534             for p in self.node.last_peers:
535                 if p.mode == "good":
536                     self.failUnlessEqual(len(p.allocated), 1)
537                 elif p.mode == "small":
538                     self.failUnlessEqual(len(p.allocated), 0)
539                 total_allocated += len(p.allocated)
540             self.failUnlessEqual(total_allocated, 10)
541         d.addCallback(_check)
542         return d
543
544
545 class StorageIndex(unittest.TestCase):
546     def test_params_must_matter(self):
547         DATA = "I am some data"
548         u = upload.Data(DATA, convergence="")
549         eu = upload.EncryptAnUploadable(u)
550         d1 = eu.get_storage_index()
551
552         # CHK means the same data should encrypt the same way
553         u = upload.Data(DATA, convergence="")
554         eu = upload.EncryptAnUploadable(u)
555         d1a = eu.get_storage_index()
556
557         # but if we use a different convergence string it should be different
558         u = upload.Data(DATA, convergence="wheee!")
559         eu = upload.EncryptAnUploadable(u)
560         d1salt1 = eu.get_storage_index()
561
562         # and if we add yet a different convergence it should be different again
563         u = upload.Data(DATA, convergence="NOT wheee!")
564         eu = upload.EncryptAnUploadable(u)
565         d1salt2 = eu.get_storage_index()
566
567         # and if we use the first string again it should be the same as last time
568         u = upload.Data(DATA, convergence="wheee!")
569         eu = upload.EncryptAnUploadable(u)
570         d1salt1a = eu.get_storage_index()
571
572         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
573         u = upload.Data(DATA, convergence="")
574         u.encoding_param_k = u.default_encoding_param_k + 1
575         eu = upload.EncryptAnUploadable(u)
576         d2 = eu.get_storage_index()
577
578         # and if we use a random key, it should be different than the CHK
579         u = upload.Data(DATA, convergence=None)
580         eu = upload.EncryptAnUploadable(u)
581         d3 = eu.get_storage_index()
582         # and different from another instance
583         u = upload.Data(DATA, convergence=None)
584         eu = upload.EncryptAnUploadable(u)
585         d4 = eu.get_storage_index()
586
587         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
588         def _done(res):
589             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
590             self.failUnlessEqual(si1, si1a)
591             self.failIfEqual(si1, si2)
592             self.failIfEqual(si1, si3)
593             self.failIfEqual(si1, si4)
594             self.failIfEqual(si3, si4)
595             self.failIfEqual(si1salt1, si1)
596             self.failIfEqual(si1salt1, si1salt2)
597             self.failIfEqual(si1salt2, si1)
598             self.failUnlessEqual(si1salt1, si1salt1a)
599         d.addCallback(_done)
600         return d
601
602 class EncodingParameters(GridTestMixin, unittest.TestCase):
603     def test_configure_parameters(self):
604         self.basedir = self.mktemp()
605         hooks = {0: self._set_up_nodes_extra_config}
606         self.set_up_grid(client_config_hooks=hooks)
607         c0 = self.g.clients[0]
608
609         DATA = "data" * 100
610         u = upload.Data(DATA, convergence="")
611         d = c0.upload(u)
612         d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
613         m = monitor.Monitor()
614         d.addCallback(lambda fn: fn.check(m))
615         def _check(cr):
616             data = cr.get_data()
617             self.failUnlessEqual(data["count-shares-needed"], 7)
618             self.failUnlessEqual(data["count-shares-expected"], 12)
619         d.addCallback(_check)
620         return d
621
622     def _set_up_nodes_extra_config(self, clientdir):
623         cfgfn = os.path.join(clientdir, "tahoe.cfg")
624         oldcfg = open(cfgfn, "r").read()
625         f = open(cfgfn, "wt")
626         f.write(oldcfg)
627         f.write("\n")
628         f.write("[client]\n")
629         f.write("shares.needed = 7\n")
630         f.write("shares.total = 12\n")
631         f.write("\n")
632         f.close()
633         return None
634
635 # TODO:
636 #  upload with exactly 75 peers (shares_of_happiness)
637 #  have a download fail
638 #  cancel a download (need to implement more cancel stuff)