3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.python.failure import Failure
6 from twisted.python import log
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload
13 from allmydata.interfaces import IFileURI, FileTooLargeError, NoSharesError, \
15 from allmydata.util.assertutil import precondition
16 from allmydata.util.deferredutil import DeferredListShouldSucceed
17 from no_network import GridTestMixin
18 from common_util import ShouldFailMixin
19 from allmydata.storage_client import StorageFarmBroker
23 def extract_uri(results):
26 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
27 # have been due to an earlier test ERROR'ing out due to timeout, which seems
28 # to screw up subsequent tests.
31 class Uploadable(unittest.TestCase):
32 def shouldEqual(self, data, expected):
33 self.failUnless(isinstance(data, list))
35 self.failUnless(isinstance(e, str))
37 self.failUnlessEqual(s, expected)
39 def test_filehandle_random_key(self):
40 return self._test_filehandle(convergence=None)
42 def test_filehandle_convergent_encryption(self):
43 return self._test_filehandle(convergence="some convergence string")
45 def _test_filehandle(self, convergence):
47 u = upload.FileHandle(s, convergence=convergence)
49 d.addCallback(self.failUnlessEqual, 41)
50 d.addCallback(lambda res: u.read(1))
51 d.addCallback(self.shouldEqual, "a")
52 d.addCallback(lambda res: u.read(80))
53 d.addCallback(self.shouldEqual, "a"*40)
54 d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
55 d.addCallback(lambda res: s.close()) # that privilege is reserved for us
58 def test_filename(self):
59 basedir = "upload/Uploadable/test_filename"
61 fn = os.path.join(basedir, "file")
65 u = upload.FileName(fn, convergence=None)
67 d.addCallback(self.failUnlessEqual, 41)
68 d.addCallback(lambda res: u.read(1))
69 d.addCallback(self.shouldEqual, "a")
70 d.addCallback(lambda res: u.read(80))
71 d.addCallback(self.shouldEqual, "a"*40)
72 d.addCallback(lambda res: u.close())
77 u = upload.Data(s, convergence=None)
79 d.addCallback(self.failUnlessEqual, 41)
80 d.addCallback(lambda res: u.read(1))
81 d.addCallback(self.shouldEqual, "a")
82 d.addCallback(lambda res: u.read(80))
83 d.addCallback(self.shouldEqual, "a"*40)
84 d.addCallback(lambda res: u.close())
87 class ServerError(Exception):
90 class FakeStorageServer:
91 def __init__(self, mode):
95 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
96 { "maximum-immutable-share-size": 2**32 },
97 "application-version": str(allmydata.__full_version__),
100 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
101 { "maximum-immutable-share-size": 10 },
102 "application-version": str(allmydata.__full_version__),
106 def callRemote(self, methname, *args, **kwargs):
108 meth = getattr(self, methname)
109 return meth(*args, **kwargs)
111 d.addCallback(lambda res: _call())
114 def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
115 sharenums, share_size, canary):
116 #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
117 if self.mode == "first-fail":
118 if self.queries == 0:
120 if self.mode == "second-fail":
121 if self.queries == 1:
124 if self.mode == "full":
126 elif self.mode == "already got them":
127 return (set(sharenums), {},)
129 for shnum in sharenums:
130 self.allocated.append( (storage_index, shnum) )
132 dict([( shnum, FakeBucketWriter(share_size) )
133 for shnum in sharenums]),
136 class FakeBucketWriter:
137 # a diagnostic version of storageserver.BucketWriter
138 def __init__(self, size):
139 self.data = StringIO()
143 def callRemote(self, methname, *args, **kwargs):
145 meth = getattr(self, "remote_" + methname)
146 return meth(*args, **kwargs)
148 d.addCallback(lambda res: _call())
151 def remote_write(self, offset, data):
152 precondition(not self.closed)
153 precondition(offset >= 0)
154 precondition(offset+len(data) <= self._size,
155 "offset=%d + data=%d > size=%d" %
156 (offset, len(data), self._size))
157 self.data.seek(offset)
158 self.data.write(data)
160 def remote_close(self):
161 precondition(not self.closed)
164 def remote_abort(self):
165 log.err("uh oh, I was asked to abort")
168 DEFAULT_ENCODING_PARAMETERS = {"k":25,
171 "max_segment_size": 1*MiB,
173 def __init__(self, mode="good", num_servers=50):
174 self.num_servers = num_servers
175 if type(mode) is str:
176 mode = dict([i,mode] for i in range(num_servers))
177 peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
178 for fakeid in range(self.num_servers) ]
179 self.storage_broker = StorageFarmBroker(None, permute_peers=True)
180 for (serverid, server) in peers:
181 self.storage_broker.test_add_server(serverid, server)
182 self.last_peers = [p[1] for p in peers]
184 def log(self, *args, **kwargs):
186 def get_encoding_parameters(self):
187 return self.DEFAULT_ENCODING_PARAMETERS
188 def get_storage_broker(self):
189 return self.storage_broker
190 _secret_holder = client.SecretHolder("lease secret")
192 class GotTooFarError(Exception):
195 class GiganticUploadable(upload.FileHandle):
196 def __init__(self, size):
200 def get_encryption_key(self):
201 return defer.succeed("\x00" * 16)
203 return defer.succeed(self._size)
204 def read(self, length):
205 left = self._size - self._fp
206 length = min(left, length)
208 if self._fp > 1000000:
209 # terminate the test early.
210 raise GotTooFarError("we shouldn't be allowed to get this far")
211 return defer.succeed(["\x00" * length])
216 Once upon a time, there was a beautiful princess named Buttercup. She lived
217 in a magical land where every file was stored securely among millions of
218 machines, and nobody ever worried about their data being lost ever again.
221 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
225 SIZE_LARGE = len(DATA)
227 def upload_data(uploader, data):
228 u = upload.Data(data, convergence=None)
229 return uploader.upload(u)
230 def upload_filename(uploader, filename):
231 u = upload.FileName(filename, convergence=None)
232 return uploader.upload(u)
233 def upload_filehandle(uploader, fh):
234 u = upload.FileHandle(fh, convergence=None)
235 return uploader.upload(u)
237 class GoodServer(unittest.TestCase, ShouldFailMixin):
239 self.node = FakeClient(mode="good")
240 self.u = upload.Uploader()
241 self.u.running = True
242 self.u.parent = self.node
244 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
248 "max_segment_size": max_segsize,
250 self.node.DEFAULT_ENCODING_PARAMETERS = p
252 def _check_small(self, newuri, size):
254 self.failUnless(isinstance(u, uri.LiteralFileURI))
255 self.failUnlessEqual(len(u.data), size)
257 def _check_large(self, newuri, size):
259 self.failUnless(isinstance(u, uri.CHKFileURI))
260 self.failUnless(isinstance(u.storage_index, str))
261 self.failUnlessEqual(len(u.storage_index), 16)
262 self.failUnless(isinstance(u.key, str))
263 self.failUnlessEqual(len(u.key), 16)
264 self.failUnlessEqual(u.size, size)
266 def get_data(self, size):
269 def test_too_large(self):
270 # we've removed the 4GiB share size limit (see ticket #346 for
271 # details), but still have an 8-byte field, so the limit is now
272 # 2**64, so make sure we reject files larger than that.
273 k = 3; happy = 7; n = 10
274 self.set_encoding_parameters(k, happy, n)
276 data1 = GiganticUploadable(big)
277 d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
278 "This file is too large to be uploaded (data_size)",
279 self.u.upload, data1)
280 data2 = GiganticUploadable(big-3)
281 d.addCallback(lambda res:
282 self.shouldFail(FileTooLargeError,
283 "test_too_large-data2",
284 "This file is too large to be uploaded (offsets)",
285 self.u.upload, data2))
286 # I don't know where the actual limit is.. it depends upon how large
287 # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
290 def test_data_zero(self):
291 data = self.get_data(SIZE_ZERO)
292 d = upload_data(self.u, data)
293 d.addCallback(extract_uri)
294 d.addCallback(self._check_small, SIZE_ZERO)
297 def test_data_small(self):
298 data = self.get_data(SIZE_SMALL)
299 d = upload_data(self.u, data)
300 d.addCallback(extract_uri)
301 d.addCallback(self._check_small, SIZE_SMALL)
304 def test_data_large(self):
305 data = self.get_data(SIZE_LARGE)
306 d = upload_data(self.u, data)
307 d.addCallback(extract_uri)
308 d.addCallback(self._check_large, SIZE_LARGE)
311 def test_data_large_odd_segments(self):
312 data = self.get_data(SIZE_LARGE)
313 segsize = int(SIZE_LARGE / 2.5)
314 # we want 3 segments, since that's not a power of two
315 self.set_encoding_parameters(25, 75, 100, segsize)
316 d = upload_data(self.u, data)
317 d.addCallback(extract_uri)
318 d.addCallback(self._check_large, SIZE_LARGE)
321 def test_filehandle_zero(self):
322 data = self.get_data(SIZE_ZERO)
323 d = upload_filehandle(self.u, StringIO(data))
324 d.addCallback(extract_uri)
325 d.addCallback(self._check_small, SIZE_ZERO)
328 def test_filehandle_small(self):
329 data = self.get_data(SIZE_SMALL)
330 d = upload_filehandle(self.u, StringIO(data))
331 d.addCallback(extract_uri)
332 d.addCallback(self._check_small, SIZE_SMALL)
335 def test_filehandle_large(self):
336 data = self.get_data(SIZE_LARGE)
337 d = upload_filehandle(self.u, StringIO(data))
338 d.addCallback(extract_uri)
339 d.addCallback(self._check_large, SIZE_LARGE)
342 def test_filename_zero(self):
343 fn = "Uploader-test_filename_zero.data"
345 data = self.get_data(SIZE_ZERO)
348 d = upload_filename(self.u, fn)
349 d.addCallback(extract_uri)
350 d.addCallback(self._check_small, SIZE_ZERO)
353 def test_filename_small(self):
354 fn = "Uploader-test_filename_small.data"
356 data = self.get_data(SIZE_SMALL)
359 d = upload_filename(self.u, fn)
360 d.addCallback(extract_uri)
361 d.addCallback(self._check_small, SIZE_SMALL)
364 def test_filename_large(self):
365 fn = "Uploader-test_filename_large.data"
367 data = self.get_data(SIZE_LARGE)
370 d = upload_filename(self.u, fn)
371 d.addCallback(extract_uri)
372 d.addCallback(self._check_large, SIZE_LARGE)
375 class ServerErrors(unittest.TestCase, ShouldFailMixin):
376 def make_node(self, mode, num_servers=10):
377 self.node = FakeClient(mode, num_servers)
378 self.u = upload.Uploader()
379 self.u.running = True
380 self.u.parent = self.node
382 def _check_large(self, newuri, size):
384 self.failUnless(isinstance(u, uri.CHKFileURI))
385 self.failUnless(isinstance(u.storage_index, str))
386 self.failUnlessEqual(len(u.storage_index), 16)
387 self.failUnless(isinstance(u.key, str))
388 self.failUnlessEqual(len(u.key), 16)
389 self.failUnlessEqual(u.size, size)
391 def test_first_error(self):
392 mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
394 d = upload_data(self.u, DATA)
395 d.addCallback(extract_uri)
396 d.addCallback(self._check_large, SIZE_LARGE)
399 def test_first_error_all(self):
400 self.make_node("first-fail")
401 d = self.shouldFail(NoSharesError, "first_error_all",
402 "peer selection failed",
403 upload_data, self.u, DATA)
405 self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
406 # there should also be a 'last failure was' message
407 self.failUnlessIn("ServerError", str(f.value))
408 d.addCallback(_check)
411 def test_second_error(self):
412 # we want to make sure we make it to a third pass. This means that
413 # the first pass was insufficient to place all shares, and at least
414 # one of second pass servers (other than the last one) accepted a
415 # share (so we'll believe that a third pass will be useful). (if
416 # everyone but the last server throws an error, then we'll send all
417 # the remaining shares to the last server at the end of the second
418 # pass, and if that succeeds, we won't make it to a third pass).
420 # we can achieve this 97.5% of the time by using 40 servers, having
421 # 39 of them fail on the second request, leaving only one to succeed
422 # on the second request. (we need to keep the number of servers low
423 # enough to ensure a second pass with 100 shares).
424 mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
425 self.make_node(mode, 40)
426 d = upload_data(self.u, DATA)
427 d.addCallback(extract_uri)
428 d.addCallback(self._check_large, SIZE_LARGE)
431 def test_second_error_all(self):
432 self.make_node("second-fail")
433 d = self.shouldFail(NotEnoughSharesError, "second_error_all",
434 "peer selection failed",
435 upload_data, self.u, DATA)
437 self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
438 # there should also be a 'last failure was' message
439 self.failUnlessIn("ServerError", str(f.value))
440 d.addCallback(_check)
443 class FullServer(unittest.TestCase):
445 self.node = FakeClient(mode="full")
446 self.u = upload.Uploader()
447 self.u.running = True
448 self.u.parent = self.node
450 def _should_fail(self, f):
451 self.failUnless(isinstance(f, Failure) and f.check(NoSharesError), f)
453 def test_data_large(self):
455 d = upload_data(self.u, data)
456 d.addBoth(self._should_fail)
459 class PeerSelection(unittest.TestCase):
461 def make_client(self, num_servers=50):
462 self.node = FakeClient(mode="good", num_servers=num_servers)
463 self.u = upload.Uploader()
464 self.u.running = True
465 self.u.parent = self.node
467 def get_data(self, size):
470 def _check_large(self, newuri, size):
472 self.failUnless(isinstance(u, uri.CHKFileURI))
473 self.failUnless(isinstance(u.storage_index, str))
474 self.failUnlessEqual(len(u.storage_index), 16)
475 self.failUnless(isinstance(u.key, str))
476 self.failUnlessEqual(len(u.key), 16)
477 self.failUnlessEqual(u.size, size)
479 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
483 "max_segment_size": max_segsize,
485 self.node.DEFAULT_ENCODING_PARAMETERS = p
487 def test_one_each(self):
488 # if we have 50 shares, and there are 50 peers, and they all accept a
489 # share, we should get exactly one share per peer
492 data = self.get_data(SIZE_LARGE)
493 self.set_encoding_parameters(25, 30, 50)
494 d = upload_data(self.u, data)
495 d.addCallback(extract_uri)
496 d.addCallback(self._check_large, SIZE_LARGE)
498 for p in self.node.last_peers:
499 allocated = p.allocated
500 self.failUnlessEqual(len(allocated), 1)
501 self.failUnlessEqual(p.queries, 1)
502 d.addCallback(_check)
505 def test_two_each(self):
506 # if we have 100 shares, and there are 50 peers, and they all accept
507 # all shares, we should get exactly two shares per peer
510 data = self.get_data(SIZE_LARGE)
511 self.set_encoding_parameters(50, 75, 100)
512 d = upload_data(self.u, data)
513 d.addCallback(extract_uri)
514 d.addCallback(self._check_large, SIZE_LARGE)
516 for p in self.node.last_peers:
517 allocated = p.allocated
518 self.failUnlessEqual(len(allocated), 2)
519 self.failUnlessEqual(p.queries, 2)
520 d.addCallback(_check)
523 def test_one_each_plus_one_extra(self):
524 # if we have 51 shares, and there are 50 peers, then one peer gets
525 # two shares and the rest get just one
528 data = self.get_data(SIZE_LARGE)
529 self.set_encoding_parameters(24, 41, 51)
530 d = upload_data(self.u, data)
531 d.addCallback(extract_uri)
532 d.addCallback(self._check_large, SIZE_LARGE)
536 for p in self.node.last_peers:
537 allocated = p.allocated
538 self.failUnless(len(allocated) in (1,2), len(allocated))
539 if len(allocated) == 1:
540 self.failUnlessEqual(p.queries, 1)
543 self.failUnlessEqual(p.queries, 2)
545 self.failUnlessEqual(len(got_one), 49)
546 self.failUnlessEqual(len(got_two), 1)
547 d.addCallback(_check)
550 def test_four_each(self):
551 # if we have 200 shares, and there are 50 peers, then each peer gets
552 # 4 shares. The design goal is to accomplish this with only two
556 data = self.get_data(SIZE_LARGE)
557 self.set_encoding_parameters(100, 150, 200)
558 d = upload_data(self.u, data)
559 d.addCallback(extract_uri)
560 d.addCallback(self._check_large, SIZE_LARGE)
562 for p in self.node.last_peers:
563 allocated = p.allocated
564 self.failUnlessEqual(len(allocated), 4)
565 self.failUnlessEqual(p.queries, 2)
566 d.addCallback(_check)
569 def test_three_of_ten(self):
570 # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
574 data = self.get_data(SIZE_LARGE)
575 self.set_encoding_parameters(3, 5, 10)
576 d = upload_data(self.u, data)
577 d.addCallback(extract_uri)
578 d.addCallback(self._check_large, SIZE_LARGE)
581 for p in self.node.last_peers:
582 allocated = p.allocated
583 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
584 histogram = [counts.get(i, 0) for i in range(5)]
585 self.failUnlessEqual(histogram, [0,0,0,2,1])
586 d.addCallback(_check)
589 def test_some_big_some_small(self):
590 # 10 shares, 20 servers, but half the servers don't support a
591 # share-size large enough for our file
592 mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
593 self.node = FakeClient(mode, num_servers=20)
594 self.u = upload.Uploader()
595 self.u.running = True
596 self.u.parent = self.node
598 data = self.get_data(SIZE_LARGE)
599 self.set_encoding_parameters(3, 5, 10)
600 d = upload_data(self.u, data)
601 d.addCallback(extract_uri)
602 d.addCallback(self._check_large, SIZE_LARGE)
604 # we should have put one share each on the big peers, and zero
605 # shares on the small peers
607 for p in self.node.last_peers:
609 self.failUnlessEqual(len(p.allocated), 1)
610 elif p.mode == "small":
611 self.failUnlessEqual(len(p.allocated), 0)
612 total_allocated += len(p.allocated)
613 self.failUnlessEqual(total_allocated, 10)
614 d.addCallback(_check)
618 class StorageIndex(unittest.TestCase):
619 def test_params_must_matter(self):
620 DATA = "I am some data"
621 u = upload.Data(DATA, convergence="")
622 eu = upload.EncryptAnUploadable(u)
623 d1 = eu.get_storage_index()
625 # CHK means the same data should encrypt the same way
626 u = upload.Data(DATA, convergence="")
627 eu = upload.EncryptAnUploadable(u)
628 d1a = eu.get_storage_index()
630 # but if we use a different convergence string it should be different
631 u = upload.Data(DATA, convergence="wheee!")
632 eu = upload.EncryptAnUploadable(u)
633 d1salt1 = eu.get_storage_index()
635 # and if we add yet a different convergence it should be different again
636 u = upload.Data(DATA, convergence="NOT wheee!")
637 eu = upload.EncryptAnUploadable(u)
638 d1salt2 = eu.get_storage_index()
640 # and if we use the first string again it should be the same as last time
641 u = upload.Data(DATA, convergence="wheee!")
642 eu = upload.EncryptAnUploadable(u)
643 d1salt1a = eu.get_storage_index()
645 # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
646 u = upload.Data(DATA, convergence="")
647 u.encoding_param_k = u.default_encoding_param_k + 1
648 eu = upload.EncryptAnUploadable(u)
649 d2 = eu.get_storage_index()
651 # and if we use a random key, it should be different than the CHK
652 u = upload.Data(DATA, convergence=None)
653 eu = upload.EncryptAnUploadable(u)
654 d3 = eu.get_storage_index()
655 # and different from another instance
656 u = upload.Data(DATA, convergence=None)
657 eu = upload.EncryptAnUploadable(u)
658 d4 = eu.get_storage_index()
660 d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
662 si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
663 self.failUnlessEqual(si1, si1a)
664 self.failIfEqual(si1, si2)
665 self.failIfEqual(si1, si3)
666 self.failIfEqual(si1, si4)
667 self.failIfEqual(si3, si4)
668 self.failIfEqual(si1salt1, si1)
669 self.failIfEqual(si1salt1, si1salt2)
670 self.failIfEqual(si1salt2, si1)
671 self.failUnlessEqual(si1salt1, si1salt1a)
675 class EncodingParameters(GridTestMixin, unittest.TestCase):
676 def test_configure_parameters(self):
677 self.basedir = self.mktemp()
678 hooks = {0: self._set_up_nodes_extra_config}
679 self.set_up_grid(client_config_hooks=hooks)
680 c0 = self.g.clients[0]
683 u = upload.Data(DATA, convergence="")
685 d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
686 m = monitor.Monitor()
687 d.addCallback(lambda fn: fn.check(m))
690 self.failUnlessEqual(data["count-shares-needed"], 7)
691 self.failUnlessEqual(data["count-shares-expected"], 12)
692 d.addCallback(_check)
695 def _set_up_nodes_extra_config(self, clientdir):
696 cfgfn = os.path.join(clientdir, "tahoe.cfg")
697 oldcfg = open(cfgfn, "r").read()
698 f = open(cfgfn, "wt")
701 f.write("[client]\n")
702 f.write("shares.needed = 7\n")
703 f.write("shares.total = 12\n")
709 # upload with exactly 75 peers (shares_of_happiness)
710 # have a download fail
711 # cancel a download (need to implement more cancel stuff)