1 # -*- coding: utf-8 -*-
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.python.failure import Failure
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
14 from allmydata.util.assertutil import precondition
15 from allmydata.util.deferredutil import DeferredListShouldSucceed
16 from allmydata.test.no_network import GridTestMixin
17 from allmydata.test.common_util import ShouldFailMixin
18 from allmydata.util.happinessutil import servers_of_happiness, \
19 shares_by_server, merge_peers
20 from allmydata.storage_client import StorageFarmBroker
21 from allmydata.storage.server import storage_index_to_dir
25 def extract_uri(results):
28 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
29 # have been due to an earlier test ERROR'ing out due to timeout, which seems
30 # to screw up subsequent tests.
33 class Uploadable(unittest.TestCase):
34 def shouldEqual(self, data, expected):
35 self.failUnless(isinstance(data, list))
37 self.failUnless(isinstance(e, str))
39 self.failUnlessEqual(s, expected)
41 def test_filehandle_random_key(self):
42 return self._test_filehandle(convergence=None)
44 def test_filehandle_convergent_encryption(self):
45 return self._test_filehandle(convergence="some convergence string")
47 def _test_filehandle(self, convergence):
49 u = upload.FileHandle(s, convergence=convergence)
51 d.addCallback(self.failUnlessEqual, 41)
52 d.addCallback(lambda res: u.read(1))
53 d.addCallback(self.shouldEqual, "a")
54 d.addCallback(lambda res: u.read(80))
55 d.addCallback(self.shouldEqual, "a"*40)
56 d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
57 d.addCallback(lambda res: s.close()) # that privilege is reserved for us
60 def test_filename(self):
61 basedir = "upload/Uploadable/test_filename"
63 fn = os.path.join(basedir, "file")
67 u = upload.FileName(fn, convergence=None)
69 d.addCallback(self.failUnlessEqual, 41)
70 d.addCallback(lambda res: u.read(1))
71 d.addCallback(self.shouldEqual, "a")
72 d.addCallback(lambda res: u.read(80))
73 d.addCallback(self.shouldEqual, "a"*40)
74 d.addCallback(lambda res: u.close())
79 u = upload.Data(s, convergence=None)
81 d.addCallback(self.failUnlessEqual, 41)
82 d.addCallback(lambda res: u.read(1))
83 d.addCallback(self.shouldEqual, "a")
84 d.addCallback(lambda res: u.read(80))
85 d.addCallback(self.shouldEqual, "a"*40)
86 d.addCallback(lambda res: u.close())
89 class ServerError(Exception):
93 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
97 "max_segment_size": max_segsize,
99 self.node.DEFAULT_ENCODING_PARAMETERS = p
101 class FakeStorageServer:
102 def __init__(self, mode):
106 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
107 { "maximum-immutable-share-size": 2**32 },
108 "application-version": str(allmydata.__full_version__),
111 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
112 { "maximum-immutable-share-size": 10 },
113 "application-version": str(allmydata.__full_version__),
117 def callRemote(self, methname, *args, **kwargs):
119 meth = getattr(self, methname)
120 return meth(*args, **kwargs)
122 d.addCallback(lambda res: _call())
125 def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
126 sharenums, share_size, canary):
127 #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
128 if self.mode == "first-fail":
129 if self.queries == 0:
131 if self.mode == "second-fail":
132 if self.queries == 1:
135 if self.mode == "full":
137 elif self.mode == "already got them":
138 return (set(sharenums), {},)
140 for shnum in sharenums:
141 self.allocated.append( (storage_index, shnum) )
143 dict([( shnum, FakeBucketWriter(share_size) )
144 for shnum in sharenums]),
147 class FakeBucketWriter:
148 # a diagnostic version of storageserver.BucketWriter
149 def __init__(self, size):
150 self.data = StringIO()
154 def callRemote(self, methname, *args, **kwargs):
156 meth = getattr(self, "remote_" + methname)
157 return meth(*args, **kwargs)
159 d.addCallback(lambda res: _call())
163 def callRemoteOnly(self, methname, *args, **kwargs):
164 d = self.callRemote(methname, *args, **kwargs)
165 del d # callRemoteOnly ignores this
169 def remote_write(self, offset, data):
170 precondition(not self.closed)
171 precondition(offset >= 0)
172 precondition(offset+len(data) <= self._size,
173 "offset=%d + data=%d > size=%d" %
174 (offset, len(data), self._size))
175 self.data.seek(offset)
176 self.data.write(data)
178 def remote_close(self):
179 precondition(not self.closed)
182 def remote_abort(self):
186 DEFAULT_ENCODING_PARAMETERS = {"k":25,
189 "max_segment_size": 1*MiB,
191 def __init__(self, mode="good", num_servers=50):
192 self.num_servers = num_servers
193 if type(mode) is str:
194 mode = dict([i,mode] for i in range(num_servers))
195 peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
196 for fakeid in range(self.num_servers) ]
197 self.storage_broker = StorageFarmBroker(None, permute_peers=True)
198 for (serverid, server) in peers:
199 self.storage_broker.test_add_server(serverid, server)
200 self.last_peers = [p[1] for p in peers]
202 def log(self, *args, **kwargs):
204 def get_encoding_parameters(self):
205 return self.DEFAULT_ENCODING_PARAMETERS
206 def get_storage_broker(self):
207 return self.storage_broker
208 _secret_holder = client.SecretHolder("lease secret", "convergence secret")
210 class GotTooFarError(Exception):
213 class GiganticUploadable(upload.FileHandle):
214 def __init__(self, size):
218 def get_encryption_key(self):
219 return defer.succeed("\x00" * 16)
221 return defer.succeed(self._size)
222 def read(self, length):
223 left = self._size - self._fp
224 length = min(left, length)
226 if self._fp > 1000000:
227 # terminate the test early.
228 raise GotTooFarError("we shouldn't be allowed to get this far")
229 return defer.succeed(["\x00" * length])
234 Once upon a time, there was a beautiful princess named Buttercup. She lived
235 in a magical land where every file was stored securely among millions of
236 machines, and nobody ever worried about their data being lost ever again.
239 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
243 SIZE_LARGE = len(DATA)
245 def upload_data(uploader, data):
246 u = upload.Data(data, convergence=None)
247 return uploader.upload(u)
248 def upload_filename(uploader, filename):
249 u = upload.FileName(filename, convergence=None)
250 return uploader.upload(u)
251 def upload_filehandle(uploader, fh):
252 u = upload.FileHandle(fh, convergence=None)
253 return uploader.upload(u)
255 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
257 self.node = FakeClient(mode="good")
258 self.u = upload.Uploader()
259 self.u.running = True
260 self.u.parent = self.node
262 def _check_small(self, newuri, size):
263 u = uri.from_string(newuri)
264 self.failUnless(isinstance(u, uri.LiteralFileURI))
265 self.failUnlessEqual(len(u.data), size)
267 def _check_large(self, newuri, size):
268 u = uri.from_string(newuri)
269 self.failUnless(isinstance(u, uri.CHKFileURI))
270 self.failUnless(isinstance(u.get_storage_index(), str))
271 self.failUnlessEqual(len(u.get_storage_index()), 16)
272 self.failUnless(isinstance(u.key, str))
273 self.failUnlessEqual(len(u.key), 16)
274 self.failUnlessEqual(u.size, size)
276 def get_data(self, size):
279 def test_too_large(self):
280 # we've removed the 4GiB share size limit (see ticket #346 for
281 # details), but still have an 8-byte field, so the limit is now
282 # 2**64, so make sure we reject files larger than that.
283 k = 3; happy = 7; n = 10
284 self.set_encoding_parameters(k, happy, n)
286 data1 = GiganticUploadable(big)
287 d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
288 "This file is too large to be uploaded (data_size)",
289 self.u.upload, data1)
290 data2 = GiganticUploadable(big-3)
291 d.addCallback(lambda res:
292 self.shouldFail(FileTooLargeError,
293 "test_too_large-data2",
294 "This file is too large to be uploaded (offsets)",
295 self.u.upload, data2))
296 # I don't know where the actual limit is.. it depends upon how large
297 # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
300 def test_data_zero(self):
301 data = self.get_data(SIZE_ZERO)
302 d = upload_data(self.u, data)
303 d.addCallback(extract_uri)
304 d.addCallback(self._check_small, SIZE_ZERO)
307 def test_data_small(self):
308 data = self.get_data(SIZE_SMALL)
309 d = upload_data(self.u, data)
310 d.addCallback(extract_uri)
311 d.addCallback(self._check_small, SIZE_SMALL)
314 def test_data_large(self):
315 data = self.get_data(SIZE_LARGE)
316 d = upload_data(self.u, data)
317 d.addCallback(extract_uri)
318 d.addCallback(self._check_large, SIZE_LARGE)
321 def test_data_large_odd_segments(self):
322 data = self.get_data(SIZE_LARGE)
323 segsize = int(SIZE_LARGE / 2.5)
324 # we want 3 segments, since that's not a power of two
325 self.set_encoding_parameters(25, 25, 100, segsize)
326 d = upload_data(self.u, data)
327 d.addCallback(extract_uri)
328 d.addCallback(self._check_large, SIZE_LARGE)
331 def test_filehandle_zero(self):
332 data = self.get_data(SIZE_ZERO)
333 d = upload_filehandle(self.u, StringIO(data))
334 d.addCallback(extract_uri)
335 d.addCallback(self._check_small, SIZE_ZERO)
338 def test_filehandle_small(self):
339 data = self.get_data(SIZE_SMALL)
340 d = upload_filehandle(self.u, StringIO(data))
341 d.addCallback(extract_uri)
342 d.addCallback(self._check_small, SIZE_SMALL)
345 def test_filehandle_large(self):
346 data = self.get_data(SIZE_LARGE)
347 d = upload_filehandle(self.u, StringIO(data))
348 d.addCallback(extract_uri)
349 d.addCallback(self._check_large, SIZE_LARGE)
352 def test_filename_zero(self):
353 fn = "Uploader-test_filename_zero.data"
355 data = self.get_data(SIZE_ZERO)
358 d = upload_filename(self.u, fn)
359 d.addCallback(extract_uri)
360 d.addCallback(self._check_small, SIZE_ZERO)
363 def test_filename_small(self):
364 fn = "Uploader-test_filename_small.data"
366 data = self.get_data(SIZE_SMALL)
369 d = upload_filename(self.u, fn)
370 d.addCallback(extract_uri)
371 d.addCallback(self._check_small, SIZE_SMALL)
374 def test_filename_large(self):
375 fn = "Uploader-test_filename_large.data"
377 data = self.get_data(SIZE_LARGE)
380 d = upload_filename(self.u, fn)
381 d.addCallback(extract_uri)
382 d.addCallback(self._check_large, SIZE_LARGE)
385 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
386 def make_node(self, mode, num_servers=10):
387 self.node = FakeClient(mode, num_servers)
388 self.u = upload.Uploader()
389 self.u.running = True
390 self.u.parent = self.node
392 def _check_large(self, newuri, size):
393 u = uri.from_string(newuri)
394 self.failUnless(isinstance(u, uri.CHKFileURI))
395 self.failUnless(isinstance(u.get_storage_index(), str))
396 self.failUnlessEqual(len(u.get_storage_index()), 16)
397 self.failUnless(isinstance(u.key, str))
398 self.failUnlessEqual(len(u.key), 16)
399 self.failUnlessEqual(u.size, size)
401 def test_first_error(self):
402 mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
404 self.set_encoding_parameters(k=25, happy=1, n=50)
405 d = upload_data(self.u, DATA)
406 d.addCallback(extract_uri)
407 d.addCallback(self._check_large, SIZE_LARGE)
410 def test_first_error_all(self):
411 self.make_node("first-fail")
412 d = self.shouldFail(UploadUnhappinessError, "first_error_all",
413 "peer selection failed",
414 upload_data, self.u, DATA)
416 self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
417 # there should also be a 'last failure was' message
418 self.failUnlessIn("ServerError", str(f.value))
419 d.addCallback(_check)
422 def test_second_error(self):
423 # we want to make sure we make it to a third pass. This means that
424 # the first pass was insufficient to place all shares, and at least
425 # one of second pass servers (other than the last one) accepted a
426 # share (so we'll believe that a third pass will be useful). (if
427 # everyone but the last server throws an error, then we'll send all
428 # the remaining shares to the last server at the end of the second
429 # pass, and if that succeeds, we won't make it to a third pass).
431 # we can achieve this 97.5% of the time by using 40 servers, having
432 # 39 of them fail on the second request, leaving only one to succeed
433 # on the second request. (we need to keep the number of servers low
434 # enough to ensure a second pass with 100 shares).
435 mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
436 self.make_node(mode, 40)
437 d = upload_data(self.u, DATA)
438 d.addCallback(extract_uri)
439 d.addCallback(self._check_large, SIZE_LARGE)
442 def test_second_error_all(self):
443 self.make_node("second-fail")
444 d = self.shouldFail(UploadUnhappinessError, "second_error_all",
445 "peer selection failed",
446 upload_data, self.u, DATA)
448 self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
449 # there should also be a 'last failure was' message
450 self.failUnlessIn("ServerError", str(f.value))
451 d.addCallback(_check)
454 class FullServer(unittest.TestCase):
456 self.node = FakeClient(mode="full")
457 self.u = upload.Uploader()
458 self.u.running = True
459 self.u.parent = self.node
461 def _should_fail(self, f):
462 self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
464 def test_data_large(self):
466 d = upload_data(self.u, data)
467 d.addBoth(self._should_fail)
470 class PeerSelection(unittest.TestCase):
472 def make_client(self, num_servers=50):
473 self.node = FakeClient(mode="good", num_servers=num_servers)
474 self.u = upload.Uploader()
475 self.u.running = True
476 self.u.parent = self.node
478 def get_data(self, size):
481 def _check_large(self, newuri, size):
482 u = uri.from_string(newuri)
483 self.failUnless(isinstance(u, uri.CHKFileURI))
484 self.failUnless(isinstance(u.get_storage_index(), str))
485 self.failUnlessEqual(len(u.get_storage_index()), 16)
486 self.failUnless(isinstance(u.key, str))
487 self.failUnlessEqual(len(u.key), 16)
488 self.failUnlessEqual(u.size, size)
490 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
494 "max_segment_size": max_segsize,
496 self.node.DEFAULT_ENCODING_PARAMETERS = p
498 def test_one_each(self):
499 # if we have 50 shares, and there are 50 peers, and they all accept a
500 # share, we should get exactly one share per peer
503 data = self.get_data(SIZE_LARGE)
504 self.set_encoding_parameters(25, 30, 50)
505 d = upload_data(self.u, data)
506 d.addCallback(extract_uri)
507 d.addCallback(self._check_large, SIZE_LARGE)
509 for p in self.node.last_peers:
510 allocated = p.allocated
511 self.failUnlessEqual(len(allocated), 1)
512 self.failUnlessEqual(p.queries, 1)
513 d.addCallback(_check)
516 def test_two_each(self):
517 # if we have 100 shares, and there are 50 peers, and they all accept
518 # all shares, we should get exactly two shares per peer
521 data = self.get_data(SIZE_LARGE)
522 # if there are 50 peers, then happy needs to be <= 50
523 self.set_encoding_parameters(50, 50, 100)
524 d = upload_data(self.u, data)
525 d.addCallback(extract_uri)
526 d.addCallback(self._check_large, SIZE_LARGE)
528 for p in self.node.last_peers:
529 allocated = p.allocated
530 self.failUnlessEqual(len(allocated), 2)
531 self.failUnlessEqual(p.queries, 2)
532 d.addCallback(_check)
535 def test_one_each_plus_one_extra(self):
536 # if we have 51 shares, and there are 50 peers, then one peer gets
537 # two shares and the rest get just one
540 data = self.get_data(SIZE_LARGE)
541 self.set_encoding_parameters(24, 41, 51)
542 d = upload_data(self.u, data)
543 d.addCallback(extract_uri)
544 d.addCallback(self._check_large, SIZE_LARGE)
548 for p in self.node.last_peers:
549 allocated = p.allocated
550 self.failUnless(len(allocated) in (1,2), len(allocated))
551 if len(allocated) == 1:
552 self.failUnlessEqual(p.queries, 1)
555 self.failUnlessEqual(p.queries, 2)
557 self.failUnlessEqual(len(got_one), 49)
558 self.failUnlessEqual(len(got_two), 1)
559 d.addCallback(_check)
562 def test_four_each(self):
563 # if we have 200 shares, and there are 50 peers, then each peer gets
564 # 4 shares. The design goal is to accomplish this with only two
568 data = self.get_data(SIZE_LARGE)
569 # if there are 50 peers, then happy should be no more than 50 if
570 # we want this to work.
571 self.set_encoding_parameters(100, 50, 200)
572 d = upload_data(self.u, data)
573 d.addCallback(extract_uri)
574 d.addCallback(self._check_large, SIZE_LARGE)
576 for p in self.node.last_peers:
577 allocated = p.allocated
578 self.failUnlessEqual(len(allocated), 4)
579 self.failUnlessEqual(p.queries, 2)
580 d.addCallback(_check)
583 def test_three_of_ten(self):
584 # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
588 data = self.get_data(SIZE_LARGE)
589 self.set_encoding_parameters(3, 3, 10)
590 d = upload_data(self.u, data)
591 d.addCallback(extract_uri)
592 d.addCallback(self._check_large, SIZE_LARGE)
595 for p in self.node.last_peers:
596 allocated = p.allocated
597 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
598 histogram = [counts.get(i, 0) for i in range(5)]
599 self.failUnlessEqual(histogram, [0,0,0,2,1])
600 d.addCallback(_check)
603 def test_some_big_some_small(self):
604 # 10 shares, 20 servers, but half the servers don't support a
605 # share-size large enough for our file
606 mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
607 self.node = FakeClient(mode, num_servers=20)
608 self.u = upload.Uploader()
609 self.u.running = True
610 self.u.parent = self.node
612 data = self.get_data(SIZE_LARGE)
613 self.set_encoding_parameters(3, 5, 10)
614 d = upload_data(self.u, data)
615 d.addCallback(extract_uri)
616 d.addCallback(self._check_large, SIZE_LARGE)
618 # we should have put one share each on the big peers, and zero
619 # shares on the small peers
621 for p in self.node.last_peers:
623 self.failUnlessEqual(len(p.allocated), 1)
624 elif p.mode == "small":
625 self.failUnlessEqual(len(p.allocated), 0)
626 total_allocated += len(p.allocated)
627 self.failUnlessEqual(total_allocated, 10)
628 d.addCallback(_check)
632 class StorageIndex(unittest.TestCase):
633 def test_params_must_matter(self):
634 DATA = "I am some data"
635 u = upload.Data(DATA, convergence="")
636 eu = upload.EncryptAnUploadable(u)
637 d1 = eu.get_storage_index()
639 # CHK means the same data should encrypt the same way
640 u = upload.Data(DATA, convergence="")
641 eu = upload.EncryptAnUploadable(u)
642 d1a = eu.get_storage_index()
644 # but if we use a different convergence string it should be different
645 u = upload.Data(DATA, convergence="wheee!")
646 eu = upload.EncryptAnUploadable(u)
647 d1salt1 = eu.get_storage_index()
649 # and if we add yet a different convergence it should be different again
650 u = upload.Data(DATA, convergence="NOT wheee!")
651 eu = upload.EncryptAnUploadable(u)
652 d1salt2 = eu.get_storage_index()
654 # and if we use the first string again it should be the same as last time
655 u = upload.Data(DATA, convergence="wheee!")
656 eu = upload.EncryptAnUploadable(u)
657 d1salt1a = eu.get_storage_index()
659 # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
660 u = upload.Data(DATA, convergence="")
661 u.encoding_param_k = u.default_encoding_param_k + 1
662 eu = upload.EncryptAnUploadable(u)
663 d2 = eu.get_storage_index()
665 # and if we use a random key, it should be different than the CHK
666 u = upload.Data(DATA, convergence=None)
667 eu = upload.EncryptAnUploadable(u)
668 d3 = eu.get_storage_index()
669 # and different from another instance
670 u = upload.Data(DATA, convergence=None)
671 eu = upload.EncryptAnUploadable(u)
672 d4 = eu.get_storage_index()
674 d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
676 si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
677 self.failUnlessEqual(si1, si1a)
678 self.failIfEqual(si1, si2)
679 self.failIfEqual(si1, si3)
680 self.failIfEqual(si1, si4)
681 self.failIfEqual(si3, si4)
682 self.failIfEqual(si1salt1, si1)
683 self.failIfEqual(si1salt1, si1salt2)
684 self.failIfEqual(si1salt2, si1)
685 self.failUnlessEqual(si1salt1, si1salt1a)
689 # copied from python docs because itertools.combinations was added in
690 # python 2.6 and we support >= 2.4.
691 def combinations(iterable, r):
692 # combinations('ABCD', 2) --> AB AC AD BC BD CD
693 # combinations(range(4), 3) --> 012 013 023 123
694 pool = tuple(iterable)
699 yield tuple(pool[i] for i in indices)
701 for i in reversed(range(r)):
702 if indices[i] != i + n - r:
707 for j in range(i+1, r):
708 indices[j] = indices[j-1] + 1
709 yield tuple(pool[i] for i in indices)
711 def is_happy_enough(servertoshnums, h, k):
712 """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """
713 if len(servertoshnums) < h:
715 # print "servertoshnums: ", servertoshnums, h, k
716 for happysetcombo in combinations(servertoshnums.iterkeys(), h):
717 # print "happysetcombo: ", happysetcombo
718 for subsetcombo in combinations(happysetcombo, k):
719 shnums = reduce(set.union, [ servertoshnums[s] for s in subsetcombo ])
720 # print "subsetcombo: ", subsetcombo, ", shnums: ", shnums
722 # print "NOT HAAPP{Y", shnums, k
727 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
729 def find_all_shares(self, unused=None):
730 """Locate shares on disk. Returns a dict that maps
731 server to set of sharenums.
733 assert self.g, "I tried to find a grid at self.g, but failed"
734 servertoshnums = {} # k: server, v: set(shnum)
736 for i, c in self.g.servers_by_number.iteritems():
737 for (dirp, dirns, fns) in os.walk(c.sharedir):
742 # Whoops, I guess that's not a share file then.
745 servertoshnums.setdefault(i, set()).add(sharenum)
747 return servertoshnums
749 def _do_upload_with_broken_servers(self, servers_to_break):
751 I act like a normal upload, but before I send the results of
752 Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
753 PeerTrackers in the used_peers part of the return result.
755 assert self.g, "I tried to find a grid at self.g, but failed"
756 broker = self.g.clients[0].storage_broker
757 sh = self.g.clients[0]._secret_holder
758 data = upload.Data("data" * 10000, convergence="")
759 data.encoding_param_k = 3
760 data.encoding_param_happy = 4
761 data.encoding_param_n = 10
762 uploadable = upload.EncryptAnUploadable(data)
763 encoder = encode.Encoder()
764 encoder.set_encrypted_uploadable(uploadable)
765 status = upload.UploadStatus()
766 selector = upload.Tahoe2PeerSelector("dglev", "test", status)
767 storage_index = encoder.get_param("storage_index")
768 share_size = encoder.get_param("share_size")
769 block_size = encoder.get_param("block_size")
770 num_segments = encoder.get_param("num_segments")
771 d = selector.get_shareholders(broker, sh, storage_index,
772 share_size, block_size, num_segments,
774 def _have_shareholders((used_peers, already_peers)):
775 assert servers_to_break <= len(used_peers)
776 for index in xrange(servers_to_break):
777 server = list(used_peers)[index]
778 for share in server.buckets.keys():
779 server.buckets[share].abort()
781 servermap = already_peers.copy()
782 for peer in used_peers:
783 buckets.update(peer.buckets)
784 for bucket in peer.buckets:
785 servermap.setdefault(bucket, set()).add(peer.peerid)
786 encoder.set_shareholders(buckets, servermap)
789 d.addCallback(_have_shareholders)
792 def _has_happy_share_distribution(self):
793 servertoshnums = self.find_all_shares()
794 k = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['k']
795 h = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['happy']
796 return is_happy_enough(servertoshnums, h, k)
798 def _add_server(self, server_number, readonly=False):
799 assert self.g, "I tried to find a grid at self.g, but failed"
800 ss = self.g.make_server(server_number, readonly)
801 self.g.add_server(server_number, ss)
804 def _add_server_with_share(self, server_number, share_number=None,
806 self._add_server(server_number, readonly)
807 if share_number is not None:
808 self._copy_share_to_server(share_number, server_number)
811 def _copy_share_to_server(self, share_number, server_number):
812 ss = self.g.servers_by_number[server_number]
813 # Copy share i from the directory associated with the first
814 # storage server to the directory associated with this one.
815 assert self.g, "I tried to find a grid at self.g, but failed"
816 assert self.shares, "I tried to find shares at self.shares, but failed"
817 old_share_location = self.shares[share_number][2]
818 new_share_location = os.path.join(ss.storedir, "shares")
819 si = uri.from_string(self.uri).get_storage_index()
820 new_share_location = os.path.join(new_share_location,
821 storage_index_to_dir(si))
822 if not os.path.exists(new_share_location):
823 os.makedirs(new_share_location)
824 new_share_location = os.path.join(new_share_location,
826 if old_share_location != new_share_location:
827 shutil.copy(old_share_location, new_share_location)
828 shares = self.find_uri_shares(self.uri)
829 # Make sure that the storage server has the share.
830 self.failUnless((share_number, ss.my_nodeid, new_share_location)
833 def _setup_grid(self):
835 I set up a NoNetworkGrid with a single server and client.
837 self.set_up_grid(num_clients=1, num_servers=1)
839 def _setup_and_upload(self, **kwargs):
841 I set up a NoNetworkGrid with a single server and client,
842 upload a file to it, store its uri in self.uri, and store its
843 sharedata in self.shares.
846 client = self.g.clients[0]
847 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
848 if "n" in kwargs and "k" in kwargs:
849 client.DEFAULT_ENCODING_PARAMETERS['k'] = kwargs['k']
850 client.DEFAULT_ENCODING_PARAMETERS['n'] = kwargs['n']
851 data = upload.Data("data" * 10000, convergence="")
853 d = client.upload(data)
856 d.addCallback(_store_uri)
857 d.addCallback(lambda ign:
858 self.find_uri_shares(self.uri))
859 def _store_shares(shares):
861 d.addCallback(_store_shares)
865 def test_configure_parameters(self):
866 self.basedir = self.mktemp()
867 hooks = {0: self._set_up_nodes_extra_config}
868 self.set_up_grid(client_config_hooks=hooks)
869 c0 = self.g.clients[0]
872 u = upload.Data(DATA, convergence="")
874 d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
875 m = monitor.Monitor()
876 d.addCallback(lambda fn: fn.check(m))
879 self.failUnlessEqual(data["count-shares-needed"], 7)
880 self.failUnlessEqual(data["count-shares-expected"], 12)
881 d.addCallback(_check)
885 def _setUp(self, ns):
886 # Used by test_happy_semantics and test_preexisting_share_behavior
887 # to set up the grid.
888 self.node = FakeClient(mode="good", num_servers=ns)
889 self.u = upload.Uploader()
890 self.u.running = True
891 self.u.parent = self.node
894 def test_happy_semantics(self):
896 DATA = upload.Data("kittens" * 10000, convergence="")
897 # These parameters are unsatisfiable with only 2 servers.
898 self.set_encoding_parameters(k=3, happy=5, n=10)
899 d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
900 "shares could be placed or found on only 2 "
901 "server(s). We were asked to place shares on "
902 "at least 5 server(s) such that any 3 of them "
903 "have enough shares to recover the file",
905 # Let's reset the client to have 10 servers
906 d.addCallback(lambda ign:
908 # These parameters are satisfiable with 10 servers.
909 d.addCallback(lambda ign:
910 self.set_encoding_parameters(k=3, happy=5, n=10))
911 d.addCallback(lambda ign:
913 # Let's reset the client to have 7 servers
914 # (this is less than n, but more than h)
915 d.addCallback(lambda ign:
917 # These parameters are satisfiable with 7 servers.
918 d.addCallback(lambda ign:
919 self.set_encoding_parameters(k=3, happy=5, n=10))
920 d.addCallback(lambda ign:
924 def test_aborted_shares(self):
925 self.basedir = "upload/EncodingParameters/aborted_shares"
926 self.set_up_grid(num_servers=4)
927 c = self.g.clients[0]
928 DATA = upload.Data(100* "kittens", convergence="")
929 # These parameters are unsatisfiable with only 4 servers, but should
930 # work with 5, as long as the original 4 are not stuck in the open
931 # BucketWriter state (open() but not
932 parms = {"k":2, "happy":5, "n":5, "max_segment_size": 1*MiB}
933 c.DEFAULT_ENCODING_PARAMETERS = parms
934 d = self.shouldFail(UploadUnhappinessError, "test_aborted_shares",
935 "shares could be placed on only 4 "
936 "server(s) such that any 2 of them have enough "
937 "shares to recover the file, but we were asked "
938 "to place shares on at least 5 such servers",
940 # now add the 5th server
941 d.addCallback(lambda ign: self._add_server(4, False))
942 # and this time the upload ought to succeed
943 d.addCallback(lambda ign: c.upload(DATA))
944 d.addCallback(lambda ign:
945 self.failUnless(self._has_happy_share_distribution()))
949 def test_problem_layout_comment_52(self):
951 self.basedir = self.mktemp()
953 # This scenario is at
954 # http://allmydata.org/trac/tahoe/ticket/778#comment:52
956 # The scenario in comment:52 proposes that we have a layout
958 # server 0: shares 1 - 9
959 # server 1: share 0, read-only
960 # server 2: share 0, read-only
961 # server 3: share 0, read-only
962 # To get access to the shares, we will first upload to one
963 # server, which will then have shares 0 - 9. We'll then
964 # add three new servers, configure them to not accept any new
965 # shares, then write share 0 directly into the serverdir of each,
966 # and then remove share 0 from server 0 in the same way.
967 # Then each of servers 1 - 3 will report that they have share 0,
968 # and will not accept any new share, while server 0 will report that
969 # it has shares 1 - 9 and will accept new shares.
970 # We'll then set 'happy' = 4, and see that an upload fails
972 d = self._setup_and_upload()
973 d.addCallback(lambda ign:
974 self._add_server_with_share(server_number=1, share_number=0,
976 d.addCallback(lambda ign:
977 self._add_server_with_share(server_number=2, share_number=0,
979 d.addCallback(lambda ign:
980 self._add_server_with_share(server_number=3, share_number=0,
982 # Remove the first share from server 0.
983 def _remove_share_0_from_server_0():
984 share_location = self.shares[0][2]
985 os.remove(share_location)
986 d.addCallback(lambda ign:
987 _remove_share_0_from_server_0())
988 # Set happy = 4 in the client.
990 client = self.g.clients[0]
991 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
993 d.addCallback(lambda ign:
995 # Uploading data should fail
996 d.addCallback(lambda client:
997 self.shouldFail(UploadUnhappinessError,
998 "test_problem_layout_comment_52_test_1",
999 "shares could be placed or found on 4 server(s), "
1000 "but they are not spread out evenly enough to "
1001 "ensure that any 3 of these servers would have "
1002 "enough shares to recover the file. "
1003 "We were asked to place shares on at "
1004 "least 4 servers such that any 3 of them have "
1005 "enough shares to recover the file",
1006 client.upload, upload.Data("data" * 10000,
1009 # Do comment:52, but like this:
1011 # server 3: share 0, read-only
1012 # server 1: share 0, read-only
1013 # server 0: shares 0-9
1014 d.addCallback(lambda ign:
1016 d.addCallback(lambda ign:
1017 self._setup_and_upload())
1018 d.addCallback(lambda ign:
1019 self._add_server(server_number=2))
1020 d.addCallback(lambda ign:
1021 self._add_server_with_share(server_number=3, share_number=0,
1023 d.addCallback(lambda ign:
1024 self._add_server_with_share(server_number=1, share_number=0,
1027 client = self.g.clients[0]
1028 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1030 d.addCallback(lambda ign:
1032 d.addCallback(lambda client:
1033 self.shouldFail(UploadUnhappinessError,
1034 "test_problem_layout_comment_52_test_2",
1035 "shares could be placed on only 3 server(s) such "
1036 "that any 3 of them have enough shares to recover "
1037 "the file, but we were asked to place shares on "
1038 "at least 4 such servers.",
1039 client.upload, upload.Data("data" * 10000,
1044 def test_problem_layout_comment_53(self):
1045 # This scenario is at
1046 # http://allmydata.org/trac/tahoe/ticket/778#comment:53
1048 # Set up the grid to have one server
1049 def _change_basedir(ign):
1050 self.basedir = self.mktemp()
1051 _change_basedir(None)
1052 # We start by uploading all of the shares to one server.
1053 # Next, we'll add three new servers to our NoNetworkGrid. We'll add
1054 # one share from our initial upload to each of these.
1055 # The counterintuitive ordering of the share numbers is to deal with
1056 # the permuting of these servers -- distributing the shares this
1057 # way ensures that the Tahoe2PeerSelector sees them in the order
1059 d = self._setup_and_upload()
1060 d.addCallback(lambda ign:
1061 self._add_server_with_share(server_number=1, share_number=2))
1062 d.addCallback(lambda ign:
1063 self._add_server_with_share(server_number=2, share_number=0))
1064 d.addCallback(lambda ign:
1065 self._add_server_with_share(server_number=3, share_number=1))
1066 # So, we now have the following layout:
1067 # server 0: shares 0 - 9
1071 # We change the 'happy' parameter in the client to 4.
1072 # The Tahoe2PeerSelector will see the peers permuted as:
1074 # Ideally, a reupload of our original data should work.
1075 def _reset_encoding_parameters(ign, happy=4):
1076 client = self.g.clients[0]
1077 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1079 d.addCallback(_reset_encoding_parameters)
1080 d.addCallback(lambda client:
1081 client.upload(upload.Data("data" * 10000, convergence="")))
1082 d.addCallback(lambda ign:
1083 self.failUnless(self._has_happy_share_distribution()))
1086 # This scenario is basically comment:53, but changed so that the
1087 # Tahoe2PeerSelector sees the server with all of the shares before
1088 # any of the other servers.
1090 # server 2: shares 0 - 9
1094 # The Tahoe2PeerSelector sees the peers permuted as:
1096 # Note that server 0 has been replaced by server 4; this makes it
1097 # easier to ensure that the last server seen by Tahoe2PeerSelector
1098 # has only one share.
1099 d.addCallback(_change_basedir)
1100 d.addCallback(lambda ign:
1101 self._setup_and_upload())
1102 d.addCallback(lambda ign:
1103 self._add_server_with_share(server_number=2, share_number=0))
1104 d.addCallback(lambda ign:
1105 self._add_server_with_share(server_number=3, share_number=1))
1106 d.addCallback(lambda ign:
1107 self._add_server_with_share(server_number=1, share_number=2))
1108 # Copy all of the other shares to server number 2
1109 def _copy_shares(ign):
1110 for i in xrange(0, 10):
1111 self._copy_share_to_server(i, 2)
1112 d.addCallback(_copy_shares)
1113 # Remove the first server, and add a placeholder with share 0
1114 d.addCallback(lambda ign:
1115 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1116 d.addCallback(lambda ign:
1117 self._add_server_with_share(server_number=4, share_number=0))
1118 # Now try uploading.
1119 d.addCallback(_reset_encoding_parameters)
1120 d.addCallback(lambda client:
1121 client.upload(upload.Data("data" * 10000, convergence="")))
1122 d.addCallback(lambda ign:
1123 self.failUnless(self._has_happy_share_distribution()))
1126 # Try the same thing, but with empty servers after the first one
1127 # We want to make sure that Tahoe2PeerSelector will redistribute
1128 # shares as necessary, not simply discover an existing layout.
1130 # server 2: shares 0 - 9
1134 d.addCallback(_change_basedir)
1135 d.addCallback(lambda ign:
1136 self._setup_and_upload())
1137 d.addCallback(lambda ign:
1138 self._add_server(server_number=2))
1139 d.addCallback(lambda ign:
1140 self._add_server(server_number=3))
1141 d.addCallback(lambda ign:
1142 self._add_server(server_number=1))
1143 d.addCallback(lambda ign:
1144 self._add_server(server_number=4))
1145 d.addCallback(_copy_shares)
1146 d.addCallback(lambda ign:
1147 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1148 d.addCallback(_reset_encoding_parameters)
1149 d.addCallback(lambda client:
1150 client.upload(upload.Data("data" * 10000, convergence="")))
1151 # Make sure that only as many shares as necessary to satisfy
1152 # servers of happiness were pushed.
1153 d.addCallback(lambda results:
1154 self.failUnlessEqual(results.pushed_shares, 3))
1155 d.addCallback(lambda ign:
1156 self.failUnless(self._has_happy_share_distribution()))
1159 def test_problem_layout_ticket1124(self):
1160 self.basedir = self.mktemp()
1161 d = self._setup_and_upload(k=2, n=4)
1163 # server 0: shares 0, 1, 2, 3
1164 # server 1: shares 0, 3
1167 # With this layout, an upload should just be satisfied that the current distribution is good enough, right?
1169 self._add_server_with_share(server_number=0, share_number=None)
1170 self._add_server_with_share(server_number=1, share_number=0)
1171 self._add_server_with_share(server_number=2, share_number=1)
1172 self._add_server_with_share(server_number=3, share_number=2)
1174 self._copy_share_to_server(3, 1)
1175 client = self.g.clients[0]
1176 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1179 d.addCallback(_setup)
1180 d.addCallback(lambda client:
1181 client.upload(upload.Data("data" * 10000, convergence="")))
1182 d.addCallback(lambda ign:
1183 self.failUnless(self._has_happy_share_distribution()))
1185 test_problem_layout_ticket1124.todo = "Fix this after 1.7.1 release."
1187 def test_happiness_with_some_readonly_peers(self):
1188 # Try the following layout
1189 # server 2: shares 0-9
1190 # server 4: share 0, read-only
1191 # server 3: share 1, read-only
1192 # server 1: share 2, read-only
1193 self.basedir = self.mktemp()
1194 d = self._setup_and_upload()
1195 d.addCallback(lambda ign:
1196 self._add_server_with_share(server_number=2, share_number=0))
1197 d.addCallback(lambda ign:
1198 self._add_server_with_share(server_number=3, share_number=1,
1200 d.addCallback(lambda ign:
1201 self._add_server_with_share(server_number=1, share_number=2,
1203 # Copy all of the other shares to server number 2
1204 def _copy_shares(ign):
1205 for i in xrange(1, 10):
1206 self._copy_share_to_server(i, 2)
1207 d.addCallback(_copy_shares)
1208 # Remove server 0, and add another in its place
1209 d.addCallback(lambda ign:
1210 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1211 d.addCallback(lambda ign:
1212 self._add_server_with_share(server_number=4, share_number=0,
1214 def _reset_encoding_parameters(ign, happy=4):
1215 client = self.g.clients[0]
1216 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1218 d.addCallback(_reset_encoding_parameters)
1219 d.addCallback(lambda client:
1220 client.upload(upload.Data("data" * 10000, convergence="")))
1221 d.addCallback(lambda ign:
1222 self.failUnless(self._has_happy_share_distribution()))
1226 def test_happiness_with_all_readonly_peers(self):
1227 # server 3: share 1, read-only
1228 # server 1: share 2, read-only
1229 # server 2: shares 0-9, read-only
1230 # server 4: share 0, read-only
1231 # The idea with this test is to make sure that the survey of
1232 # read-only peers doesn't undercount servers of happiness
1233 self.basedir = self.mktemp()
1234 d = self._setup_and_upload()
1235 d.addCallback(lambda ign:
1236 self._add_server_with_share(server_number=4, share_number=0,
1238 d.addCallback(lambda ign:
1239 self._add_server_with_share(server_number=3, share_number=1,
1241 d.addCallback(lambda ign:
1242 self._add_server_with_share(server_number=1, share_number=2,
1244 d.addCallback(lambda ign:
1245 self._add_server_with_share(server_number=2, share_number=0,
1247 def _copy_shares(ign):
1248 for i in xrange(1, 10):
1249 self._copy_share_to_server(i, 2)
1250 d.addCallback(_copy_shares)
1251 d.addCallback(lambda ign:
1252 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1253 def _reset_encoding_parameters(ign, happy=4):
1254 client = self.g.clients[0]
1255 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1257 d.addCallback(_reset_encoding_parameters)
1258 d.addCallback(lambda client:
1259 client.upload(upload.Data("data" * 10000, convergence="")))
1260 d.addCallback(lambda ign:
1261 self.failUnless(self._has_happy_share_distribution()))
1265 def test_dropped_servers_in_encoder(self):
1266 # The Encoder does its own "servers_of_happiness" check if it
1267 # happens to lose a bucket during an upload (it assumes that
1268 # the layout presented to it satisfies "servers_of_happiness"
1269 # until a failure occurs)
1271 # This test simulates an upload where servers break after peer
1272 # selection, but before they are written to.
1273 def _set_basedir(ign=None):
1274 self.basedir = self.mktemp()
1276 d = self._setup_and_upload();
1278 def _do_server_setup(ign):
1279 self._add_server(server_number=1)
1280 self._add_server(server_number=2)
1281 self._add_server(server_number=3)
1282 self._add_server(server_number=4)
1283 self._add_server(server_number=5)
1284 d.addCallback(_do_server_setup)
1285 # remove the original server
1286 # (necessary to ensure that the Tahoe2PeerSelector will distribute
1288 def _remove_server(ign):
1289 server = self.g.servers_by_number[0]
1290 self.g.remove_server(server.my_nodeid)
1291 d.addCallback(_remove_server)
1292 # This should succeed; we still have 4 servers, and the
1293 # happiness of the upload is 4.
1294 d.addCallback(lambda ign:
1295 self._do_upload_with_broken_servers(1))
1296 # Now, do the same thing over again, but drop 2 servers instead
1297 # of 1. This should fail, because servers_of_happiness is 4 and
1298 # we can't satisfy that.
1299 d.addCallback(_set_basedir)
1300 d.addCallback(lambda ign:
1301 self._setup_and_upload())
1302 d.addCallback(_do_server_setup)
1303 d.addCallback(_remove_server)
1304 d.addCallback(lambda ign:
1305 self.shouldFail(UploadUnhappinessError,
1306 "test_dropped_servers_in_encoder",
1307 "shares could be placed on only 3 server(s) "
1308 "such that any 3 of them have enough shares to "
1309 "recover the file, but we were asked to place "
1310 "shares on at least 4",
1311 self._do_upload_with_broken_servers, 2))
1312 # Now do the same thing over again, but make some of the servers
1313 # readonly, break some of the ones that aren't, and make sure that
1314 # happiness accounting is preserved.
1315 d.addCallback(_set_basedir)
1316 d.addCallback(lambda ign:
1317 self._setup_and_upload())
1318 def _do_server_setup_2(ign):
1322 self._add_server_with_share(4, 7, readonly=True)
1323 self._add_server_with_share(5, 8, readonly=True)
1324 d.addCallback(_do_server_setup_2)
1325 d.addCallback(_remove_server)
1326 d.addCallback(lambda ign:
1327 self._do_upload_with_broken_servers(1))
1328 d.addCallback(_set_basedir)
1329 d.addCallback(lambda ign:
1330 self._setup_and_upload())
1331 d.addCallback(_do_server_setup_2)
1332 d.addCallback(_remove_server)
1333 d.addCallback(lambda ign:
1334 self.shouldFail(UploadUnhappinessError,
1335 "test_dropped_servers_in_encoder",
1336 "shares could be placed on only 3 server(s) "
1337 "such that any 3 of them have enough shares to "
1338 "recover the file, but we were asked to place "
1339 "shares on at least 4",
1340 self._do_upload_with_broken_servers, 2))
1344 def test_merge_peers(self):
1345 # merge_peers merges a list of used_peers and a dict of
1346 # shareid -> peerid mappings.
1348 1 : set(["server1"]),
1349 2 : set(["server2"]),
1350 3 : set(["server3"]),
1351 4 : set(["server4", "server5"]),
1352 5 : set(["server1", "server2"]),
1354 # if not provided with a used_peers argument, it should just
1355 # return the first argument unchanged.
1356 self.failUnlessEqual(shares, merge_peers(shares, set([])))
1357 class FakePeerTracker:
1360 for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1361 t = FakePeerTracker()
1366 1 : set(["server1"]),
1367 2 : set(["server2"]),
1368 3 : set(["server3"]),
1369 4 : set(["server4", "server5"]),
1370 5 : set(["server1", "server2", "server5"]),
1371 6 : set(["server6"]),
1372 7 : set(["server7"]),
1373 8 : set(["server8"]),
1375 self.failUnlessEqual(expected, merge_peers(shares, set(trackers)))
1378 5 : set(["server5"]),
1379 6 : set(["server6"]),
1380 7 : set(["server7"]),
1381 8 : set(["server8"]),
1383 self.failUnlessEqual(expected, merge_peers(shares2, set(trackers)))
1387 for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1388 shares3[i] = set([server])
1389 t = FakePeerTracker()
1393 expected[i] = set([server])
1394 self.failUnlessEqual(expected, merge_peers(shares3, set(trackers)))
1397 def test_servers_of_happiness_utility_function(self):
1398 # These tests are concerned with the servers_of_happiness()
1399 # utility function, and its underlying matching algorithm. Other
1400 # aspects of the servers_of_happiness behavior are tested
1401 # elsehwere These tests exist to ensure that
1402 # servers_of_happiness doesn't under or overcount the happiness
1403 # value for given inputs.
1405 # servers_of_happiness expects a dict of
1406 # shnum => set(peerids) as a preexisting shares argument.
1408 1 : set(["server1"]),
1409 2 : set(["server2"]),
1410 3 : set(["server3"]),
1411 4 : set(["server4"])
1413 happy = servers_of_happiness(test1)
1414 self.failUnlessEqual(4, happy)
1415 test1[4] = set(["server1"])
1416 # We've added a duplicate server, so now servers_of_happiness
1417 # should be 3 instead of 4.
1418 happy = servers_of_happiness(test1)
1419 self.failUnlessEqual(3, happy)
1420 # The second argument of merge_peers should be a set of
1421 # objects with peerid and buckets as attributes. In actual use,
1422 # these will be PeerTracker instances, but for testing it is fine
1423 # to make a FakePeerTracker whose job is to hold those instance
1424 # variables to test that part.
1425 class FakePeerTracker:
1428 for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1429 t = FakePeerTracker()
1433 # Recall that test1 is a server layout with servers_of_happiness
1434 # = 3. Since there isn't any overlap between the shnum ->
1435 # set([peerid]) correspondences in test1 and those in trackers,
1436 # the result here should be 7.
1437 test2 = merge_peers(test1, set(trackers))
1438 happy = servers_of_happiness(test2)
1439 self.failUnlessEqual(7, happy)
1440 # Now add an overlapping server to trackers. This is redundant,
1441 # so it should not cause the previously reported happiness value
1443 t = FakePeerTracker()
1444 t.peerid = "server1"
1447 test2 = merge_peers(test1, set(trackers))
1448 happy = servers_of_happiness(test2)
1449 self.failUnlessEqual(7, happy)
1451 happy = servers_of_happiness(test)
1452 self.failUnlessEqual(0, happy)
1453 # Test a more substantial overlap between the trackers and the
1454 # existing assignments.
1456 1 : set(['server1']),
1457 2 : set(['server2']),
1458 3 : set(['server3']),
1459 4 : set(['server4']),
1462 t = FakePeerTracker()
1463 t.peerid = 'server5'
1466 t = FakePeerTracker()
1467 t.peerid = 'server6'
1470 # The value returned by servers_of_happiness is the size
1471 # of a maximum matching in the bipartite graph that
1472 # servers_of_happiness() makes between peerids and share
1473 # numbers. It should find something like this:
1474 # (server 1, share 1)
1475 # (server 2, share 2)
1476 # (server 3, share 3)
1477 # (server 5, share 4)
1478 # (server 6, share 5)
1480 # and, since there are 5 edges in this matching, it should
1482 test2 = merge_peers(test, set(trackers))
1483 happy = servers_of_happiness(test2)
1484 self.failUnlessEqual(5, happy)
1485 # Zooko's first puzzle:
1486 # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1488 # server 1: shares 0, 1
1489 # server 2: shares 1, 2
1492 # This should yield happiness of 3.
1494 0 : set(['server1']),
1495 1 : set(['server1', 'server2']),
1496 2 : set(['server2', 'server3']),
1498 self.failUnlessEqual(3, servers_of_happiness(test))
1499 # Zooko's second puzzle:
1500 # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1502 # server 1: shares 0, 1
1505 # This should yield happiness of 2.
1507 0 : set(['server1']),
1508 1 : set(['server1', 'server2']),
1510 self.failUnlessEqual(2, servers_of_happiness(test))
1513 def test_shares_by_server(self):
1514 test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1515 sbs = shares_by_server(test)
1516 self.failUnlessEqual(set([1]), sbs["server1"])
1517 self.failUnlessEqual(set([2]), sbs["server2"])
1518 self.failUnlessEqual(set([3]), sbs["server3"])
1519 self.failUnlessEqual(set([4]), sbs["server4"])
1521 1 : set(["server1"]),
1522 2 : set(["server1"]),
1523 3 : set(["server1"]),
1524 4 : set(["server2"]),
1525 5 : set(["server2"])
1527 sbs = shares_by_server(test1)
1528 self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1529 self.failUnlessEqual(set([4, 5]), sbs["server2"])
1530 # This should fail unless the peerid part of the mapping is a set
1531 test2 = {1: "server1"}
1532 self.shouldFail(AssertionError,
1533 "test_shares_by_server",
1535 shares_by_server, test2)
1538 def test_existing_share_detection(self):
1539 self.basedir = self.mktemp()
1540 d = self._setup_and_upload()
1541 # Our final setup should look like this:
1542 # server 1: shares 0 - 9, read-only
1546 # The purpose of this test is to make sure that the peer selector
1547 # knows about the shares on server 1, even though it is read-only.
1548 # It used to simply filter these out, which would cause the test
1549 # to fail when servers_of_happiness = 4.
1550 d.addCallback(lambda ign:
1551 self._add_server_with_share(1, 0, True))
1552 d.addCallback(lambda ign:
1553 self._add_server(2))
1554 d.addCallback(lambda ign:
1555 self._add_server(3))
1556 d.addCallback(lambda ign:
1557 self._add_server(4))
1558 def _copy_shares(ign):
1559 for i in xrange(1, 10):
1560 self._copy_share_to_server(i, 1)
1561 d.addCallback(_copy_shares)
1562 d.addCallback(lambda ign:
1563 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1564 def _prepare_client(ign):
1565 client = self.g.clients[0]
1566 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1568 d.addCallback(_prepare_client)
1569 d.addCallback(lambda client:
1570 client.upload(upload.Data("data" * 10000, convergence="")))
1571 d.addCallback(lambda ign:
1572 self.failUnless(self._has_happy_share_distribution()))
1576 def test_query_counting(self):
1577 # If peer selection fails, Tahoe2PeerSelector prints out a lot
1578 # of helpful diagnostic information, including query stats.
1579 # This test helps make sure that that information is accurate.
1580 self.basedir = self.mktemp()
1581 d = self._setup_and_upload()
1583 for i in xrange(1, 11):
1584 self._add_server(server_number=i)
1585 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1586 c = self.g.clients[0]
1587 # We set happy to an unsatisfiable value so that we can check the
1588 # counting in the exception message. The same progress message
1589 # is also used when the upload is successful, but in that case it
1590 # only gets written to a log, so we can't see what it says.
1591 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1593 d.addCallback(_setup)
1594 d.addCallback(lambda c:
1595 self.shouldFail(UploadUnhappinessError, "test_query_counting",
1596 "10 queries placed some shares",
1597 c.upload, upload.Data("data" * 10000,
1599 # Now try with some readonly servers. We want to make sure that
1600 # the readonly peer share discovery phase is counted correctly.
1602 self.basedir = self.mktemp()
1604 d.addCallback(_reset)
1605 d.addCallback(lambda ign:
1606 self._setup_and_upload())
1608 for i in xrange(1, 11):
1609 self._add_server(server_number=i)
1610 self._add_server(server_number=11, readonly=True)
1611 self._add_server(server_number=12, readonly=True)
1612 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1613 c = self.g.clients[0]
1614 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1616 d.addCallback(_then)
1617 d.addCallback(lambda c:
1618 self.shouldFail(UploadUnhappinessError, "test_query_counting",
1619 "2 placed none (of which 2 placed none due to "
1620 "the server being full",
1621 c.upload, upload.Data("data" * 10000,
1623 # Now try the case where the upload process finds a bunch of the
1624 # shares that it wants to place on the first server, including
1625 # the one that it wanted to allocate there. Though no shares will
1626 # be allocated in this request, it should still be called
1627 # productive, since it caused some homeless shares to be
1629 d.addCallback(_reset)
1630 d.addCallback(lambda ign:
1631 self._setup_and_upload())
1634 for i in xrange(1, 11):
1635 self._add_server(server_number=i)
1636 # Copy all of the shares to server 9, since that will be
1637 # the first one that the selector sees.
1638 for i in xrange(10):
1639 self._copy_share_to_server(i, 9)
1640 # Remove server 0, and its contents
1641 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1642 # Make happiness unsatisfiable
1643 c = self.g.clients[0]
1644 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1646 d.addCallback(_next)
1647 d.addCallback(lambda c:
1648 self.shouldFail(UploadUnhappinessError, "test_query_counting",
1649 "1 queries placed some shares",
1650 c.upload, upload.Data("data" * 10000,
1655 def test_upper_limit_on_readonly_queries(self):
1656 self.basedir = self.mktemp()
1657 d = self._setup_and_upload()
1659 for i in xrange(1, 11):
1660 self._add_server(server_number=i, readonly=True)
1661 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1662 c = self.g.clients[0]
1663 c.DEFAULT_ENCODING_PARAMETERS['k'] = 2
1664 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1665 c.DEFAULT_ENCODING_PARAMETERS['n'] = 4
1667 d.addCallback(_then)
1668 d.addCallback(lambda client:
1669 self.shouldFail(UploadUnhappinessError,
1670 "test_upper_limit_on_readonly_queries",
1671 "sent 8 queries to 8 peers",
1673 upload.Data('data' * 10000, convergence="")))
1677 def test_exception_messages_during_peer_selection(self):
1678 # server 1: read-only, no shares
1679 # server 2: read-only, no shares
1680 # server 3: read-only, no shares
1681 # server 4: read-only, no shares
1682 # server 5: read-only, no shares
1683 # This will fail, but we want to make sure that the log messages
1684 # are informative about why it has failed.
1685 self.basedir = self.mktemp()
1686 d = self._setup_and_upload()
1687 d.addCallback(lambda ign:
1688 self._add_server(server_number=1, readonly=True))
1689 d.addCallback(lambda ign:
1690 self._add_server(server_number=2, readonly=True))
1691 d.addCallback(lambda ign:
1692 self._add_server(server_number=3, readonly=True))
1693 d.addCallback(lambda ign:
1694 self._add_server(server_number=4, readonly=True))
1695 d.addCallback(lambda ign:
1696 self._add_server(server_number=5, readonly=True))
1697 d.addCallback(lambda ign:
1698 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1699 def _reset_encoding_parameters(ign, happy=4):
1700 client = self.g.clients[0]
1701 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1703 d.addCallback(_reset_encoding_parameters)
1704 d.addCallback(lambda client:
1705 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1706 "placed 0 shares out of 10 "
1707 "total (10 homeless), want to place shares on at "
1708 "least 4 servers such that any 3 of them have "
1709 "enough shares to recover the file, "
1710 "sent 5 queries to 5 peers, 0 queries placed "
1711 "some shares, 5 placed none "
1712 "(of which 5 placed none due to the server being "
1713 "full and 0 placed none due to an error)",
1715 upload.Data("data" * 10000, convergence="")))
1718 # server 1: read-only, no shares
1719 # server 2: broken, no shares
1720 # server 3: read-only, no shares
1721 # server 4: read-only, no shares
1722 # server 5: read-only, no shares
1724 self.basedir = self.mktemp()
1725 d.addCallback(_reset)
1726 d.addCallback(lambda ign:
1727 self._setup_and_upload())
1728 d.addCallback(lambda ign:
1729 self._add_server(server_number=1, readonly=True))
1730 d.addCallback(lambda ign:
1731 self._add_server(server_number=2))
1732 def _break_server_2(ign):
1733 server = self.g.servers_by_number[2].my_nodeid
1734 # We have to break the server in servers_by_id,
1735 # because the one in servers_by_number isn't wrapped,
1736 # and doesn't look at its broken attribute when answering
1738 self.g.servers_by_id[server].broken = True
1739 d.addCallback(_break_server_2)
1740 d.addCallback(lambda ign:
1741 self._add_server(server_number=3, readonly=True))
1742 d.addCallback(lambda ign:
1743 self._add_server(server_number=4, readonly=True))
1744 d.addCallback(lambda ign:
1745 self._add_server(server_number=5, readonly=True))
1746 d.addCallback(lambda ign:
1747 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1748 d.addCallback(_reset_encoding_parameters)
1749 d.addCallback(lambda client:
1750 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1751 "placed 0 shares out of 10 "
1752 "total (10 homeless), want to place shares on at "
1753 "least 4 servers such that any 3 of them have "
1754 "enough shares to recover the file, "
1755 "sent 5 queries to 5 peers, 0 queries placed "
1756 "some shares, 5 placed none "
1757 "(of which 4 placed none due to the server being "
1758 "full and 1 placed none due to an error)",
1760 upload.Data("data" * 10000, convergence="")))
1761 # server 0, server 1 = empty, accepting shares
1762 # This should place all of the shares, but still fail with happy=4.
1763 # We want to make sure that the exception message is worded correctly.
1764 d.addCallback(_reset)
1765 d.addCallback(lambda ign:
1767 d.addCallback(lambda ign:
1768 self._add_server(server_number=1))
1769 d.addCallback(_reset_encoding_parameters)
1770 d.addCallback(lambda client:
1771 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1772 "shares could be placed or found on only 2 "
1773 "server(s). We were asked to place shares on at "
1774 "least 4 server(s) such that any 3 of them have "
1775 "enough shares to recover the file.",
1776 client.upload, upload.Data("data" * 10000,
1778 # servers 0 - 4 = empty, accepting shares
1779 # This too should place all the shares, and this too should fail,
1780 # but since the effective happiness is more than the k encoding
1781 # parameter, it should trigger a different error message than the one
1783 d.addCallback(_reset)
1784 d.addCallback(lambda ign:
1786 d.addCallback(lambda ign:
1787 self._add_server(server_number=1))
1788 d.addCallback(lambda ign:
1789 self._add_server(server_number=2))
1790 d.addCallback(lambda ign:
1791 self._add_server(server_number=3))
1792 d.addCallback(lambda ign:
1793 self._add_server(server_number=4))
1794 d.addCallback(_reset_encoding_parameters, happy=7)
1795 d.addCallback(lambda client:
1796 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1797 "shares could be placed on only 5 server(s) such "
1798 "that any 3 of them have enough shares to recover "
1799 "the file, but we were asked to place shares on "
1800 "at least 7 such servers.",
1801 client.upload, upload.Data("data" * 10000,
1803 # server 0: shares 0 - 9
1804 # server 1: share 0, read-only
1805 # server 2: share 0, read-only
1806 # server 3: share 0, read-only
1807 # This should place all of the shares, but fail with happy=4.
1808 # Since the number of servers with shares is more than the number
1809 # necessary to reconstitute the file, this will trigger a different
1810 # error message than either of those above.
1811 d.addCallback(_reset)
1812 d.addCallback(lambda ign:
1813 self._setup_and_upload())
1814 d.addCallback(lambda ign:
1815 self._add_server_with_share(server_number=1, share_number=0,
1817 d.addCallback(lambda ign:
1818 self._add_server_with_share(server_number=2, share_number=0,
1820 d.addCallback(lambda ign:
1821 self._add_server_with_share(server_number=3, share_number=0,
1823 d.addCallback(_reset_encoding_parameters, happy=7)
1824 d.addCallback(lambda client:
1825 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1826 "shares could be placed or found on 4 server(s), "
1827 "but they are not spread out evenly enough to "
1828 "ensure that any 3 of these servers would have "
1829 "enough shares to recover the file. We were asked "
1830 "to place shares on at least 7 servers such that "
1831 "any 3 of them have enough shares to recover the "
1833 client.upload, upload.Data("data" * 10000,
1838 def test_problem_layout_comment_187(self):
1839 # #778 comment 187 broke an initial attempt at a share
1840 # redistribution algorithm. This test is here to demonstrate the
1841 # breakage, and to test that subsequent algorithms don't also
1842 # break in the same way.
1843 self.basedir = self.mktemp()
1844 d = self._setup_and_upload(k=2, n=3)
1846 # server 1: shares 0, 1, 2, readonly
1847 # server 2: share 0, readonly
1850 self._add_server_with_share(server_number=1, share_number=0,
1852 self._add_server_with_share(server_number=2, share_number=0,
1854 self._add_server_with_share(server_number=3, share_number=0)
1856 self._copy_share_to_server(1, 1)
1857 self._copy_share_to_server(2, 1)
1859 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1860 client = self.g.clients[0]
1861 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
1864 d.addCallback(_setup)
1865 d.addCallback(lambda client:
1866 client.upload(upload.Data("data" * 10000, convergence="")))
1867 d.addCallback(lambda ign:
1868 self.failUnless(self._has_happy_share_distribution()))
1870 test_problem_layout_comment_187.todo = "this isn't fixed yet"
1872 def test_problem_layout_ticket_1118(self):
1873 # #1118 includes a report from a user who hit an assertion in
1874 # the upload code with this layout.
1875 self.basedir = self.mktemp()
1876 d = self._setup_and_upload(k=2, n=4)
1878 # server 0: no shares
1879 # server 1: shares 0, 3
1882 # The order that they get queries is 0, 1, 3, 2
1884 self._add_server(server_number=0)
1885 self._add_server_with_share(server_number=1, share_number=0)
1886 self._add_server_with_share(server_number=2, share_number=2)
1887 self._add_server_with_share(server_number=3, share_number=1)
1889 self._copy_share_to_server(3, 1)
1890 storedir = self.get_serverdir(0)
1891 # remove the storedir, wiping out any existing shares
1892 shutil.rmtree(storedir)
1893 # create an empty storedir to replace the one we just removed
1895 client = self.g.clients[0]
1896 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1899 d.addCallback(_setup)
1900 d.addCallback(lambda client:
1901 client.upload(upload.Data("data" * 10000, convergence="")))
1902 d.addCallback(lambda ign:
1903 self.failUnless(self._has_happy_share_distribution()))
1906 def test_upload_succeeds_with_some_homeless_shares(self):
1907 # If the upload is forced to stop trying to place shares before
1908 # it has placed (or otherwise accounted) for all of them, but it
1909 # has placed enough to satisfy the upload health criteria that
1910 # we're using, it should still succeed.
1911 self.basedir = self.mktemp()
1912 d = self._setup_and_upload()
1913 def _server_setup(ign):
1914 # Add four servers so that we have a layout like this:
1915 # server 1: share 0, read-only
1916 # server 2: share 1, read-only
1917 # server 3: share 2, read-only
1918 # server 4: share 3, read-only
1919 # If we set happy = 4, the upload will manage to satisfy
1920 # servers of happiness, but not place all of the shares; we
1921 # want to test that the upload is declared successful in
1923 self._add_server_with_share(server_number=1, share_number=0,
1925 self._add_server_with_share(server_number=2, share_number=1,
1927 self._add_server_with_share(server_number=3, share_number=2,
1929 self._add_server_with_share(server_number=4, share_number=3,
1932 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1933 # Set the client appropriately
1934 c = self.g.clients[0]
1935 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1937 d.addCallback(_server_setup)
1938 d.addCallback(lambda client:
1939 client.upload(upload.Data("data" * 10000, convergence="")))
1940 d.addCallback(lambda ign:
1941 self.failUnless(self._has_happy_share_distribution()))
1945 def test_uploader_skips_over_servers_with_only_one_share(self):
1946 # We want to make sure that the redistribution logic ignores
1947 # servers with only one share, since placing these shares
1948 # elsewhere will at best keep happiness the same as it was, and
1950 self.basedir = self.mktemp()
1951 d = self._setup_and_upload()
1952 def _server_setup(ign):
1953 # Add some servers so that the upload will need to
1954 # redistribute, but will first pass over a couple of servers
1955 # that don't have enough shares to redistribute before
1956 # finding one that does have shares to redistribute.
1957 self._add_server_with_share(server_number=1, share_number=0)
1958 self._add_server_with_share(server_number=2, share_number=2)
1959 self._add_server_with_share(server_number=3, share_number=1)
1960 self._add_server_with_share(server_number=8, share_number=4)
1961 self._add_server_with_share(server_number=5, share_number=5)
1962 self._add_server_with_share(server_number=10, share_number=7)
1964 self._copy_share_to_server(i, 2)
1965 return self.g.clients[0]
1966 d.addCallback(_server_setup)
1967 d.addCallback(lambda client:
1968 client.upload(upload.Data("data" * 10000, convergence="")))
1969 d.addCallback(lambda ign:
1970 self.failUnless(self._has_happy_share_distribution()))
1974 def test_peer_selector_bucket_abort(self):
1975 # If peer selection for an upload fails due to an unhappy
1976 # layout, the peer selection process should abort the buckets it
1977 # allocates before failing, so that the space can be re-used.
1978 self.basedir = self.mktemp()
1979 self.set_up_grid(num_servers=5)
1981 # Try to upload a file with happy=7, which is unsatisfiable with
1982 # the current grid. This will fail, but should not take up any
1983 # space on the storage servers after it fails.
1984 client = self.g.clients[0]
1985 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
1986 d = defer.succeed(None)
1987 d.addCallback(lambda ignored:
1988 self.shouldFail(UploadUnhappinessError,
1989 "test_peer_selection_bucket_abort",
1991 client.upload, upload.Data("data" * 10000,
1993 # wait for the abort messages to get there.
1994 def _turn_barrier(res):
1995 return fireEventually(res)
1996 d.addCallback(_turn_barrier)
1998 for server in self.g.servers_by_number.values():
1999 self.failUnlessEqual(server.allocated_size(), 0)
2000 d.addCallback(_then)
2004 def test_encoder_bucket_abort(self):
2005 # If enough servers die in the process of encoding and uploading
2006 # a file to make the layout unhappy, we should cancel the
2007 # newly-allocated buckets before dying.
2008 self.basedir = self.mktemp()
2009 self.set_up_grid(num_servers=4)
2011 client = self.g.clients[0]
2012 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2014 d = defer.succeed(None)
2015 d.addCallback(lambda ignored:
2016 self.shouldFail(UploadUnhappinessError,
2017 "test_encoder_bucket_abort",
2019 self._do_upload_with_broken_servers, 1))
2020 def _turn_barrier(res):
2021 return fireEventually(res)
2022 d.addCallback(_turn_barrier)
2024 for server in self.g.servers_by_number.values():
2025 self.failUnlessEqual(server.allocated_size(), 0)
2026 d.addCallback(_then)
2030 def _set_up_nodes_extra_config(self, clientdir):
2031 cfgfn = os.path.join(clientdir, "tahoe.cfg")
2032 oldcfg = open(cfgfn, "r").read()
2033 f = open(cfgfn, "wt")
2036 f.write("[client]\n")
2037 f.write("shares.needed = 7\n")
2038 f.write("shares.total = 12\n")
2044 # upload with exactly 75 peers (shares_of_happiness)
2045 # have a download fail
2046 # cancel a download (need to implement more cancel stuff)