1 # -*- coding: utf-8 -*-
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.python.failure import Failure
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
14 from allmydata.util import log, base32
15 from allmydata.util.assertutil import precondition
16 from allmydata.util.deferredutil import DeferredListShouldSucceed
17 from allmydata.test.no_network import GridTestMixin
18 from allmydata.test.common_util import ShouldFailMixin
19 from allmydata.util.happinessutil import servers_of_happiness, \
20 shares_by_server, merge_servers
21 from allmydata.storage_client import StorageFarmBroker
22 from allmydata.storage.server import storage_index_to_dir
23 from allmydata.client import Client
27 def extract_uri(results):
28 return results.get_uri()
30 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
31 # have been due to an earlier test ERROR'ing out due to timeout, which seems
32 # to screw up subsequent tests.
35 class Uploadable(unittest.TestCase):
36 def shouldEqual(self, data, expected):
37 self.failUnless(isinstance(data, list))
39 self.failUnless(isinstance(e, str))
41 self.failUnlessEqual(s, expected)
43 def test_filehandle_random_key(self):
44 return self._test_filehandle(convergence=None)
46 def test_filehandle_convergent_encryption(self):
47 return self._test_filehandle(convergence="some convergence string")
49 def _test_filehandle(self, convergence):
51 u = upload.FileHandle(s, convergence=convergence)
53 d.addCallback(self.failUnlessEqual, 41)
54 d.addCallback(lambda res: u.read(1))
55 d.addCallback(self.shouldEqual, "a")
56 d.addCallback(lambda res: u.read(80))
57 d.addCallback(self.shouldEqual, "a"*40)
58 d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
59 d.addCallback(lambda res: s.close()) # that privilege is reserved for us
62 def test_filename(self):
63 basedir = "upload/Uploadable/test_filename"
65 fn = os.path.join(basedir, "file")
69 u = upload.FileName(fn, convergence=None)
71 d.addCallback(self.failUnlessEqual, 41)
72 d.addCallback(lambda res: u.read(1))
73 d.addCallback(self.shouldEqual, "a")
74 d.addCallback(lambda res: u.read(80))
75 d.addCallback(self.shouldEqual, "a"*40)
76 d.addCallback(lambda res: u.close())
81 u = upload.Data(s, convergence=None)
83 d.addCallback(self.failUnlessEqual, 41)
84 d.addCallback(lambda res: u.read(1))
85 d.addCallback(self.shouldEqual, "a")
86 d.addCallback(lambda res: u.read(80))
87 d.addCallback(self.shouldEqual, "a"*40)
88 d.addCallback(lambda res: u.close())
91 class ServerError(Exception):
95 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
99 "max_segment_size": max_segsize,
101 self.node.DEFAULT_ENCODING_PARAMETERS = p
103 class FakeStorageServer:
104 def __init__(self, mode):
108 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
109 { "maximum-immutable-share-size": 2**32 },
110 "application-version": str(allmydata.__full_version__),
113 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
114 { "maximum-immutable-share-size": 10 },
115 "application-version": str(allmydata.__full_version__),
119 def callRemote(self, methname, *args, **kwargs):
121 meth = getattr(self, methname)
122 return meth(*args, **kwargs)
124 d.addCallback(lambda res: _call())
127 def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
128 sharenums, share_size, canary):
129 #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
130 if self.mode == "first-fail":
131 if self.queries == 0:
133 if self.mode == "second-fail":
134 if self.queries == 1:
137 if self.mode == "full":
139 elif self.mode == "already got them":
140 return (set(sharenums), {},)
142 for shnum in sharenums:
143 self.allocated.append( (storage_index, shnum) )
145 dict([( shnum, FakeBucketWriter(share_size) )
146 for shnum in sharenums]),
149 class FakeBucketWriter:
150 # a diagnostic version of storageserver.BucketWriter
151 def __init__(self, size):
152 self.data = StringIO()
156 def callRemote(self, methname, *args, **kwargs):
158 meth = getattr(self, "remote_" + methname)
159 return meth(*args, **kwargs)
161 d.addCallback(lambda res: _call())
165 def callRemoteOnly(self, methname, *args, **kwargs):
166 d = self.callRemote(methname, *args, **kwargs)
167 del d # callRemoteOnly ignores this
171 def remote_write(self, offset, data):
172 precondition(not self.closed)
173 precondition(offset >= 0)
174 precondition(offset+len(data) <= self._size,
175 "offset=%d + data=%d > size=%d" %
176 (offset, len(data), self._size))
177 self.data.seek(offset)
178 self.data.write(data)
180 def remote_close(self):
181 precondition(not self.closed)
184 def remote_abort(self):
188 DEFAULT_ENCODING_PARAMETERS = {"k":25,
191 "max_segment_size": 1*MiB,
193 def __init__(self, mode="good", num_servers=50):
194 self.num_servers = num_servers
195 if type(mode) is str:
196 mode = dict([i,mode] for i in range(num_servers))
197 servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
198 for fakeid in range(self.num_servers) ]
199 self.storage_broker = StorageFarmBroker(None, permute_peers=True)
200 for (serverid, rref) in servers:
201 ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
202 "permutation-seed-base32": base32.b2a(serverid) }
203 self.storage_broker.test_add_rref(serverid, rref, ann)
204 self.last_servers = [s[1] for s in servers]
206 def log(self, *args, **kwargs):
208 def get_encoding_parameters(self):
209 return self.DEFAULT_ENCODING_PARAMETERS
210 def get_storage_broker(self):
211 return self.storage_broker
212 _secret_holder = client.SecretHolder("lease secret", "convergence secret")
214 class GotTooFarError(Exception):
217 class GiganticUploadable(upload.FileHandle):
218 def __init__(self, size):
222 def get_encryption_key(self):
223 return defer.succeed("\x00" * 16)
225 return defer.succeed(self._size)
226 def read(self, length):
227 left = self._size - self._fp
228 length = min(left, length)
230 if self._fp > 1000000:
231 # terminate the test early.
232 raise GotTooFarError("we shouldn't be allowed to get this far")
233 return defer.succeed(["\x00" * length])
238 Once upon a time, there was a beautiful princess named Buttercup. She lived
239 in a magical land where every file was stored securely among millions of
240 machines, and nobody ever worried about their data being lost ever again.
243 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
247 SIZE_LARGE = len(DATA)
249 def upload_data(uploader, data):
250 u = upload.Data(data, convergence=None)
251 return uploader.upload(u)
252 def upload_filename(uploader, filename):
253 u = upload.FileName(filename, convergence=None)
254 return uploader.upload(u)
255 def upload_filehandle(uploader, fh):
256 u = upload.FileHandle(fh, convergence=None)
257 return uploader.upload(u)
259 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
261 self.node = FakeClient(mode="good")
262 self.u = upload.Uploader()
263 self.u.running = True
264 self.u.parent = self.node
266 def _check_small(self, newuri, size):
267 u = uri.from_string(newuri)
268 self.failUnless(isinstance(u, uri.LiteralFileURI))
269 self.failUnlessEqual(len(u.data), size)
271 def _check_large(self, newuri, size):
272 u = uri.from_string(newuri)
273 self.failUnless(isinstance(u, uri.CHKFileURI))
274 self.failUnless(isinstance(u.get_storage_index(), str))
275 self.failUnlessEqual(len(u.get_storage_index()), 16)
276 self.failUnless(isinstance(u.key, str))
277 self.failUnlessEqual(len(u.key), 16)
278 self.failUnlessEqual(u.size, size)
280 def get_data(self, size):
283 def test_too_large(self):
284 # we've removed the 4GiB share size limit (see ticket #346 for
285 # details), but still have an 8-byte field, so the limit is now
286 # 2**64, so make sure we reject files larger than that.
287 k = 3; happy = 7; n = 10
288 self.set_encoding_parameters(k, happy, n)
290 data1 = GiganticUploadable(big)
291 d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
292 "This file is too large to be uploaded (data_size)",
293 self.u.upload, data1)
294 data2 = GiganticUploadable(big-3)
295 d.addCallback(lambda res:
296 self.shouldFail(FileTooLargeError,
297 "test_too_large-data2",
298 "This file is too large to be uploaded (offsets)",
299 self.u.upload, data2))
300 # I don't know where the actual limit is.. it depends upon how large
301 # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
304 def test_data_zero(self):
305 data = self.get_data(SIZE_ZERO)
306 d = upload_data(self.u, data)
307 d.addCallback(extract_uri)
308 d.addCallback(self._check_small, SIZE_ZERO)
311 def test_data_small(self):
312 data = self.get_data(SIZE_SMALL)
313 d = upload_data(self.u, data)
314 d.addCallback(extract_uri)
315 d.addCallback(self._check_small, SIZE_SMALL)
318 def test_data_large(self):
319 data = self.get_data(SIZE_LARGE)
320 d = upload_data(self.u, data)
321 d.addCallback(extract_uri)
322 d.addCallback(self._check_large, SIZE_LARGE)
325 def test_data_large_odd_segments(self):
326 data = self.get_data(SIZE_LARGE)
327 segsize = int(SIZE_LARGE / 2.5)
328 # we want 3 segments, since that's not a power of two
329 self.set_encoding_parameters(25, 25, 100, segsize)
330 d = upload_data(self.u, data)
331 d.addCallback(extract_uri)
332 d.addCallback(self._check_large, SIZE_LARGE)
335 def test_filehandle_zero(self):
336 data = self.get_data(SIZE_ZERO)
337 d = upload_filehandle(self.u, StringIO(data))
338 d.addCallback(extract_uri)
339 d.addCallback(self._check_small, SIZE_ZERO)
342 def test_filehandle_small(self):
343 data = self.get_data(SIZE_SMALL)
344 d = upload_filehandle(self.u, StringIO(data))
345 d.addCallback(extract_uri)
346 d.addCallback(self._check_small, SIZE_SMALL)
349 def test_filehandle_large(self):
350 data = self.get_data(SIZE_LARGE)
351 d = upload_filehandle(self.u, StringIO(data))
352 d.addCallback(extract_uri)
353 d.addCallback(self._check_large, SIZE_LARGE)
356 def test_filename_zero(self):
357 fn = "Uploader-test_filename_zero.data"
359 data = self.get_data(SIZE_ZERO)
362 d = upload_filename(self.u, fn)
363 d.addCallback(extract_uri)
364 d.addCallback(self._check_small, SIZE_ZERO)
367 def test_filename_small(self):
368 fn = "Uploader-test_filename_small.data"
370 data = self.get_data(SIZE_SMALL)
373 d = upload_filename(self.u, fn)
374 d.addCallback(extract_uri)
375 d.addCallback(self._check_small, SIZE_SMALL)
378 def test_filename_large(self):
379 fn = "Uploader-test_filename_large.data"
381 data = self.get_data(SIZE_LARGE)
384 d = upload_filename(self.u, fn)
385 d.addCallback(extract_uri)
386 d.addCallback(self._check_large, SIZE_LARGE)
389 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
390 def make_node(self, mode, num_servers=10):
391 self.node = FakeClient(mode, num_servers)
392 self.u = upload.Uploader()
393 self.u.running = True
394 self.u.parent = self.node
396 def _check_large(self, newuri, size):
397 u = uri.from_string(newuri)
398 self.failUnless(isinstance(u, uri.CHKFileURI))
399 self.failUnless(isinstance(u.get_storage_index(), str))
400 self.failUnlessEqual(len(u.get_storage_index()), 16)
401 self.failUnless(isinstance(u.key, str))
402 self.failUnlessEqual(len(u.key), 16)
403 self.failUnlessEqual(u.size, size)
405 def test_first_error(self):
406 mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
408 self.set_encoding_parameters(k=25, happy=1, n=50)
409 d = upload_data(self.u, DATA)
410 d.addCallback(extract_uri)
411 d.addCallback(self._check_large, SIZE_LARGE)
414 def test_first_error_all(self):
415 self.make_node("first-fail")
416 d = self.shouldFail(UploadUnhappinessError, "first_error_all",
417 "server selection failed",
418 upload_data, self.u, DATA)
420 self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
421 # there should also be a 'last failure was' message
422 self.failUnlessIn("ServerError", str(f.value))
423 d.addCallback(_check)
426 def test_second_error(self):
427 # we want to make sure we make it to a third pass. This means that
428 # the first pass was insufficient to place all shares, and at least
429 # one of second pass servers (other than the last one) accepted a
430 # share (so we'll believe that a third pass will be useful). (if
431 # everyone but the last server throws an error, then we'll send all
432 # the remaining shares to the last server at the end of the second
433 # pass, and if that succeeds, we won't make it to a third pass).
435 # we can achieve this 97.5% of the time by using 40 servers, having
436 # 39 of them fail on the second request, leaving only one to succeed
437 # on the second request. (we need to keep the number of servers low
438 # enough to ensure a second pass with 100 shares).
439 mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
440 self.make_node(mode, 40)
441 d = upload_data(self.u, DATA)
442 d.addCallback(extract_uri)
443 d.addCallback(self._check_large, SIZE_LARGE)
446 def test_second_error_all(self):
447 self.make_node("second-fail")
448 d = self.shouldFail(UploadUnhappinessError, "second_error_all",
449 "server selection failed",
450 upload_data, self.u, DATA)
452 self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
453 # there should also be a 'last failure was' message
454 self.failUnlessIn("ServerError", str(f.value))
455 d.addCallback(_check)
458 class FullServer(unittest.TestCase):
460 self.node = FakeClient(mode="full")
461 self.u = upload.Uploader()
462 self.u.running = True
463 self.u.parent = self.node
465 def _should_fail(self, f):
466 self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
468 def test_data_large(self):
470 d = upload_data(self.u, data)
471 d.addBoth(self._should_fail)
474 class ServerSelection(unittest.TestCase):
476 def make_client(self, num_servers=50):
477 self.node = FakeClient(mode="good", num_servers=num_servers)
478 self.u = upload.Uploader()
479 self.u.running = True
480 self.u.parent = self.node
482 def get_data(self, size):
485 def _check_large(self, newuri, size):
486 u = uri.from_string(newuri)
487 self.failUnless(isinstance(u, uri.CHKFileURI))
488 self.failUnless(isinstance(u.get_storage_index(), str))
489 self.failUnlessEqual(len(u.get_storage_index()), 16)
490 self.failUnless(isinstance(u.key, str))
491 self.failUnlessEqual(len(u.key), 16)
492 self.failUnlessEqual(u.size, size)
494 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
498 "max_segment_size": max_segsize,
500 self.node.DEFAULT_ENCODING_PARAMETERS = p
502 def test_one_each(self):
503 # if we have 50 shares, and there are 50 servers, and they all accept
504 # a share, we should get exactly one share per server
507 data = self.get_data(SIZE_LARGE)
508 self.set_encoding_parameters(25, 30, 50)
509 d = upload_data(self.u, data)
510 d.addCallback(extract_uri)
511 d.addCallback(self._check_large, SIZE_LARGE)
513 for s in self.node.last_servers:
514 allocated = s.allocated
515 self.failUnlessEqual(len(allocated), 1)
516 self.failUnlessEqual(s.queries, 1)
517 d.addCallback(_check)
520 def test_two_each(self):
521 # if we have 100 shares, and there are 50 servers, and they all
522 # accept all shares, we should get exactly two shares per server
525 data = self.get_data(SIZE_LARGE)
526 # if there are 50 servers, then happy needs to be <= 50
527 self.set_encoding_parameters(50, 50, 100)
528 d = upload_data(self.u, data)
529 d.addCallback(extract_uri)
530 d.addCallback(self._check_large, SIZE_LARGE)
532 for s in self.node.last_servers:
533 allocated = s.allocated
534 self.failUnlessEqual(len(allocated), 2)
535 self.failUnlessEqual(s.queries, 2)
536 d.addCallback(_check)
539 def test_one_each_plus_one_extra(self):
540 # if we have 51 shares, and there are 50 servers, then one server
541 # gets two shares and the rest get just one
544 data = self.get_data(SIZE_LARGE)
545 self.set_encoding_parameters(24, 41, 51)
546 d = upload_data(self.u, data)
547 d.addCallback(extract_uri)
548 d.addCallback(self._check_large, SIZE_LARGE)
552 for s in self.node.last_servers:
553 allocated = s.allocated
554 self.failUnless(len(allocated) in (1,2), len(allocated))
555 if len(allocated) == 1:
556 self.failUnlessEqual(s.queries, 1)
559 self.failUnlessEqual(s.queries, 2)
561 self.failUnlessEqual(len(got_one), 49)
562 self.failUnlessEqual(len(got_two), 1)
563 d.addCallback(_check)
566 def test_four_each(self):
567 # if we have 200 shares, and there are 50 servers, then each server
568 # gets 4 shares. The design goal is to accomplish this with only two
569 # queries per server.
572 data = self.get_data(SIZE_LARGE)
573 # if there are 50 servers, then happy should be no more than 50 if we
575 self.set_encoding_parameters(100, 50, 200)
576 d = upload_data(self.u, data)
577 d.addCallback(extract_uri)
578 d.addCallback(self._check_large, SIZE_LARGE)
580 for s in self.node.last_servers:
581 allocated = s.allocated
582 self.failUnlessEqual(len(allocated), 4)
583 self.failUnlessEqual(s.queries, 2)
584 d.addCallback(_check)
587 def test_three_of_ten(self):
588 # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
592 data = self.get_data(SIZE_LARGE)
593 self.set_encoding_parameters(3, 3, 10)
594 d = upload_data(self.u, data)
595 d.addCallback(extract_uri)
596 d.addCallback(self._check_large, SIZE_LARGE)
599 for s in self.node.last_servers:
600 allocated = s.allocated
601 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
602 histogram = [counts.get(i, 0) for i in range(5)]
603 self.failUnlessEqual(histogram, [0,0,0,2,1])
604 d.addCallback(_check)
607 def test_some_big_some_small(self):
608 # 10 shares, 20 servers, but half the servers don't support a
609 # share-size large enough for our file
610 mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
611 self.node = FakeClient(mode, num_servers=20)
612 self.u = upload.Uploader()
613 self.u.running = True
614 self.u.parent = self.node
616 data = self.get_data(SIZE_LARGE)
617 self.set_encoding_parameters(3, 5, 10)
618 d = upload_data(self.u, data)
619 d.addCallback(extract_uri)
620 d.addCallback(self._check_large, SIZE_LARGE)
622 # we should have put one share each on the big servers, and zero
623 # shares on the small servers
625 for p in self.node.last_servers:
627 self.failUnlessEqual(len(p.allocated), 1)
628 elif p.mode == "small":
629 self.failUnlessEqual(len(p.allocated), 0)
630 total_allocated += len(p.allocated)
631 self.failUnlessEqual(total_allocated, 10)
632 d.addCallback(_check)
636 class StorageIndex(unittest.TestCase):
637 def test_params_must_matter(self):
638 DATA = "I am some data"
639 PARAMS = Client.DEFAULT_ENCODING_PARAMETERS
641 u = upload.Data(DATA, convergence="")
642 u.set_default_encoding_parameters(PARAMS)
643 eu = upload.EncryptAnUploadable(u)
644 d1 = eu.get_storage_index()
646 # CHK means the same data should encrypt the same way
647 u = upload.Data(DATA, convergence="")
648 u.set_default_encoding_parameters(PARAMS)
649 eu = upload.EncryptAnUploadable(u)
650 d1a = eu.get_storage_index()
652 # but if we use a different convergence string it should be different
653 u = upload.Data(DATA, convergence="wheee!")
654 u.set_default_encoding_parameters(PARAMS)
655 eu = upload.EncryptAnUploadable(u)
656 d1salt1 = eu.get_storage_index()
658 # and if we add yet a different convergence it should be different again
659 u = upload.Data(DATA, convergence="NOT wheee!")
660 u.set_default_encoding_parameters(PARAMS)
661 eu = upload.EncryptAnUploadable(u)
662 d1salt2 = eu.get_storage_index()
664 # and if we use the first string again it should be the same as last time
665 u = upload.Data(DATA, convergence="wheee!")
666 u.set_default_encoding_parameters(PARAMS)
667 eu = upload.EncryptAnUploadable(u)
668 d1salt1a = eu.get_storage_index()
670 # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
671 u = upload.Data(DATA, convergence="")
672 u.set_default_encoding_parameters(PARAMS)
673 u.encoding_param_k = u.default_encoding_param_k + 1
674 eu = upload.EncryptAnUploadable(u)
675 d2 = eu.get_storage_index()
677 # and if we use a random key, it should be different than the CHK
678 u = upload.Data(DATA, convergence=None)
679 u.set_default_encoding_parameters(PARAMS)
680 eu = upload.EncryptAnUploadable(u)
681 d3 = eu.get_storage_index()
682 # and different from another instance
683 u = upload.Data(DATA, convergence=None)
684 u.set_default_encoding_parameters(PARAMS)
685 eu = upload.EncryptAnUploadable(u)
686 d4 = eu.get_storage_index()
688 d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
690 si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
691 self.failUnlessEqual(si1, si1a)
692 self.failIfEqual(si1, si2)
693 self.failIfEqual(si1, si3)
694 self.failIfEqual(si1, si4)
695 self.failIfEqual(si3, si4)
696 self.failIfEqual(si1salt1, si1)
697 self.failIfEqual(si1salt1, si1salt2)
698 self.failIfEqual(si1salt2, si1)
699 self.failUnlessEqual(si1salt1, si1salt1a)
703 # copied from python docs because itertools.combinations was added in
704 # python 2.6 and we support >= 2.4.
705 def combinations(iterable, r):
706 # combinations('ABCD', 2) --> AB AC AD BC BD CD
707 # combinations(range(4), 3) --> 012 013 023 123
708 pool = tuple(iterable)
713 yield tuple(pool[i] for i in indices)
715 for i in reversed(range(r)):
716 if indices[i] != i + n - r:
721 for j in range(i+1, r):
722 indices[j] = indices[j-1] + 1
723 yield tuple(pool[i] for i in indices)
725 def is_happy_enough(servertoshnums, h, k):
726 """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """
727 if len(servertoshnums) < h:
729 # print "servertoshnums: ", servertoshnums, h, k
730 for happysetcombo in combinations(servertoshnums.iterkeys(), h):
731 # print "happysetcombo: ", happysetcombo
732 for subsetcombo in combinations(happysetcombo, k):
733 shnums = reduce(set.union, [ servertoshnums[s] for s in subsetcombo ])
734 # print "subsetcombo: ", subsetcombo, ", shnums: ", shnums
736 # print "NOT HAAPP{Y", shnums, k
741 class FakeServerTracker:
742 def __init__(self, serverid, buckets):
743 self._serverid = serverid
744 self.buckets = buckets
745 def get_serverid(self):
746 return self._serverid
748 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
750 def find_all_shares(self, unused=None):
751 """Locate shares on disk. Returns a dict that maps
752 server to set of sharenums.
754 assert self.g, "I tried to find a grid at self.g, but failed"
755 servertoshnums = {} # k: server, v: set(shnum)
757 for i, c in self.g.servers_by_number.iteritems():
758 for (dirp, dirns, fns) in os.walk(c.sharedir):
763 # Whoops, I guess that's not a share file then.
766 servertoshnums.setdefault(i, set()).add(sharenum)
768 return servertoshnums
770 def _do_upload_with_broken_servers(self, servers_to_break):
772 I act like a normal upload, but before I send the results of
773 Tahoe2ServerSelector to the Encoder, I break the first
774 servers_to_break ServerTrackers in the upload_servers part of the
777 assert self.g, "I tried to find a grid at self.g, but failed"
778 broker = self.g.clients[0].storage_broker
779 sh = self.g.clients[0]._secret_holder
780 data = upload.Data("data" * 10000, convergence="")
781 data.set_default_encoding_parameters({'k': 3, 'happy': 4, 'n': 10})
782 uploadable = upload.EncryptAnUploadable(data)
783 encoder = encode.Encoder()
784 encoder.set_encrypted_uploadable(uploadable)
785 status = upload.UploadStatus()
786 selector = upload.Tahoe2ServerSelector("dglev", "test", status)
787 storage_index = encoder.get_param("storage_index")
788 share_size = encoder.get_param("share_size")
789 block_size = encoder.get_param("block_size")
790 num_segments = encoder.get_param("num_segments")
791 d = selector.get_shareholders(broker, sh, storage_index,
792 share_size, block_size, num_segments,
794 def _have_shareholders((upload_trackers, already_servers)):
795 assert servers_to_break <= len(upload_trackers)
796 for index in xrange(servers_to_break):
797 tracker = list(upload_trackers)[index]
798 for share in tracker.buckets.keys():
799 tracker.buckets[share].abort()
801 servermap = already_servers.copy()
802 for tracker in upload_trackers:
803 buckets.update(tracker.buckets)
804 for bucket in tracker.buckets:
805 servermap.setdefault(bucket, set()).add(tracker.get_serverid())
806 encoder.set_shareholders(buckets, servermap)
809 d.addCallback(_have_shareholders)
812 def _has_happy_share_distribution(self):
813 servertoshnums = self.find_all_shares()
814 k = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['k']
815 h = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['happy']
816 return is_happy_enough(servertoshnums, h, k)
818 def _add_server(self, server_number, readonly=False):
819 assert self.g, "I tried to find a grid at self.g, but failed"
820 ss = self.g.make_server(server_number, readonly)
821 log.msg("just created a server, number: %s => %s" % (server_number, ss,))
822 self.g.add_server(server_number, ss)
824 def _add_server_with_share(self, server_number, share_number=None,
826 self._add_server(server_number, readonly)
827 if share_number is not None:
828 self._copy_share_to_server(share_number, server_number)
831 def _copy_share_to_server(self, share_number, server_number):
832 ss = self.g.servers_by_number[server_number]
833 # Copy share i from the directory associated with the first
834 # storage server to the directory associated with this one.
835 assert self.g, "I tried to find a grid at self.g, but failed"
836 assert self.shares, "I tried to find shares at self.shares, but failed"
837 old_share_location = self.shares[share_number][2]
838 new_share_location = os.path.join(ss.storedir, "shares")
839 si = uri.from_string(self.uri).get_storage_index()
840 new_share_location = os.path.join(new_share_location,
841 storage_index_to_dir(si))
842 if not os.path.exists(new_share_location):
843 os.makedirs(new_share_location)
844 new_share_location = os.path.join(new_share_location,
846 if old_share_location != new_share_location:
847 shutil.copy(old_share_location, new_share_location)
848 shares = self.find_uri_shares(self.uri)
849 # Make sure that the storage server has the share.
850 self.failUnless((share_number, ss.my_nodeid, new_share_location)
853 def _setup_grid(self):
855 I set up a NoNetworkGrid with a single server and client.
857 self.set_up_grid(num_clients=1, num_servers=1)
859 def _setup_and_upload(self, **kwargs):
861 I set up a NoNetworkGrid with a single server and client,
862 upload a file to it, store its uri in self.uri, and store its
863 sharedata in self.shares.
866 client = self.g.clients[0]
867 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
868 if "n" in kwargs and "k" in kwargs:
869 client.DEFAULT_ENCODING_PARAMETERS['k'] = kwargs['k']
870 client.DEFAULT_ENCODING_PARAMETERS['n'] = kwargs['n']
871 data = upload.Data("data" * 10000, convergence="")
873 d = client.upload(data)
875 self.uri = ur.get_uri()
876 d.addCallback(_store_uri)
877 d.addCallback(lambda ign:
878 self.find_uri_shares(self.uri))
879 def _store_shares(shares):
881 d.addCallback(_store_shares)
884 def test_configure_parameters(self):
885 self.basedir = self.mktemp()
886 hooks = {0: self._set_up_nodes_extra_config}
887 self.set_up_grid(client_config_hooks=hooks)
888 c0 = self.g.clients[0]
891 u = upload.Data(DATA, convergence="")
893 d.addCallback(lambda ur: c0.create_node_from_uri(ur.get_uri()))
894 m = monitor.Monitor()
895 d.addCallback(lambda fn: fn.check(m))
897 self.failUnlessEqual(cr.get_encoding_needed(), 7)
898 self.failUnlessEqual(cr.get_encoding_expected(), 12)
899 d.addCallback(_check)
903 def _setUp(self, ns):
904 # Used by test_happy_semantics and test_preexisting_share_behavior
905 # to set up the grid.
906 self.node = FakeClient(mode="good", num_servers=ns)
907 self.u = upload.Uploader()
908 self.u.running = True
909 self.u.parent = self.node
912 def test_happy_semantics(self):
914 DATA = upload.Data("kittens" * 10000, convergence="")
915 # These parameters are unsatisfiable with only 2 servers.
916 self.set_encoding_parameters(k=3, happy=5, n=10)
917 d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
918 "shares could be placed or found on only 2 "
919 "server(s). We were asked to place shares on "
920 "at least 5 server(s) such that any 3 of them "
921 "have enough shares to recover the file",
923 # Let's reset the client to have 10 servers
924 d.addCallback(lambda ign:
926 # These parameters are satisfiable with 10 servers.
927 d.addCallback(lambda ign:
928 self.set_encoding_parameters(k=3, happy=5, n=10))
929 d.addCallback(lambda ign:
931 # Let's reset the client to have 7 servers
932 # (this is less than n, but more than h)
933 d.addCallback(lambda ign:
935 # These parameters are satisfiable with 7 servers.
936 d.addCallback(lambda ign:
937 self.set_encoding_parameters(k=3, happy=5, n=10))
938 d.addCallback(lambda ign:
942 def test_aborted_shares(self):
943 self.basedir = "upload/EncodingParameters/aborted_shares"
944 self.set_up_grid(num_servers=4)
945 c = self.g.clients[0]
946 DATA = upload.Data(100* "kittens", convergence="")
947 # These parameters are unsatisfiable with only 4 servers, but should
948 # work with 5, as long as the original 4 are not stuck in the open
949 # BucketWriter state (open() but not
950 parms = {"k":2, "happy":5, "n":5, "max_segment_size": 1*MiB}
951 c.DEFAULT_ENCODING_PARAMETERS = parms
952 d = self.shouldFail(UploadUnhappinessError, "test_aborted_shares",
953 "shares could be placed on only 4 "
954 "server(s) such that any 2 of them have enough "
955 "shares to recover the file, but we were asked "
956 "to place shares on at least 5 such servers",
958 # now add the 5th server
959 d.addCallback(lambda ign: self._add_server(4, False))
960 # and this time the upload ought to succeed
961 d.addCallback(lambda ign: c.upload(DATA))
962 d.addCallback(lambda ign:
963 self.failUnless(self._has_happy_share_distribution()))
967 def test_problem_layout_comment_52(self):
969 self.basedir = self.mktemp()
971 # This scenario is at
972 # http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:52
974 # The scenario in comment:52 proposes that we have a layout
976 # server 0: shares 1 - 9
977 # server 1: share 0, read-only
978 # server 2: share 0, read-only
979 # server 3: share 0, read-only
980 # To get access to the shares, we will first upload to one
981 # server, which will then have shares 0 - 9. We'll then
982 # add three new servers, configure them to not accept any new
983 # shares, then write share 0 directly into the serverdir of each,
984 # and then remove share 0 from server 0 in the same way.
985 # Then each of servers 1 - 3 will report that they have share 0,
986 # and will not accept any new share, while server 0 will report that
987 # it has shares 1 - 9 and will accept new shares.
988 # We'll then set 'happy' = 4, and see that an upload fails
990 d = self._setup_and_upload()
991 d.addCallback(lambda ign:
992 self._add_server_with_share(server_number=1, share_number=0,
994 d.addCallback(lambda ign:
995 self._add_server_with_share(server_number=2, share_number=0,
997 d.addCallback(lambda ign:
998 self._add_server_with_share(server_number=3, share_number=0,
1000 # Remove the first share from server 0.
1001 def _remove_share_0_from_server_0():
1002 share_location = self.shares[0][2]
1003 os.remove(share_location)
1004 d.addCallback(lambda ign:
1005 _remove_share_0_from_server_0())
1006 # Set happy = 4 in the client.
1008 client = self.g.clients[0]
1009 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1011 d.addCallback(lambda ign:
1013 # Uploading data should fail
1014 d.addCallback(lambda client:
1015 self.shouldFail(UploadUnhappinessError,
1016 "test_problem_layout_comment_52_test_1",
1017 "shares could be placed or found on 4 server(s), "
1018 "but they are not spread out evenly enough to "
1019 "ensure that any 3 of these servers would have "
1020 "enough shares to recover the file. "
1021 "We were asked to place shares on at "
1022 "least 4 servers such that any 3 of them have "
1023 "enough shares to recover the file",
1024 client.upload, upload.Data("data" * 10000,
1027 # Do comment:52, but like this:
1029 # server 3: share 0, read-only
1030 # server 1: share 0, read-only
1031 # server 0: shares 0-9
1032 d.addCallback(lambda ign:
1034 d.addCallback(lambda ign:
1035 self._setup_and_upload())
1036 d.addCallback(lambda ign:
1037 self._add_server(server_number=2))
1038 d.addCallback(lambda ign:
1039 self._add_server_with_share(server_number=3, share_number=0,
1041 d.addCallback(lambda ign:
1042 self._add_server_with_share(server_number=1, share_number=0,
1045 client = self.g.clients[0]
1046 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1048 d.addCallback(lambda ign:
1050 d.addCallback(lambda client:
1051 self.shouldFail(UploadUnhappinessError,
1052 "test_problem_layout_comment_52_test_2",
1053 "shares could be placed on only 3 server(s) such "
1054 "that any 3 of them have enough shares to recover "
1055 "the file, but we were asked to place shares on "
1056 "at least 4 such servers.",
1057 client.upload, upload.Data("data" * 10000,
1062 def test_problem_layout_comment_53(self):
1063 # This scenario is at
1064 # http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:53
1066 # Set up the grid to have one server
1067 def _change_basedir(ign):
1068 self.basedir = self.mktemp()
1069 _change_basedir(None)
1070 # We start by uploading all of the shares to one server.
1071 # Next, we'll add three new servers to our NoNetworkGrid. We'll add
1072 # one share from our initial upload to each of these.
1073 # The counterintuitive ordering of the share numbers is to deal with
1074 # the permuting of these servers -- distributing the shares this
1075 # way ensures that the Tahoe2ServerSelector sees them in the order
1077 d = self._setup_and_upload()
1078 d.addCallback(lambda ign:
1079 self._add_server_with_share(server_number=1, share_number=2))
1080 d.addCallback(lambda ign:
1081 self._add_server_with_share(server_number=2, share_number=0))
1082 d.addCallback(lambda ign:
1083 self._add_server_with_share(server_number=3, share_number=1))
1084 # So, we now have the following layout:
1085 # server 0: shares 0 - 9
1089 # We change the 'happy' parameter in the client to 4.
1090 # The Tahoe2ServerSelector will see the servers permuted as:
1092 # Ideally, a reupload of our original data should work.
1093 def _reset_encoding_parameters(ign, happy=4):
1094 client = self.g.clients[0]
1095 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1097 d.addCallback(_reset_encoding_parameters)
1098 d.addCallback(lambda client:
1099 client.upload(upload.Data("data" * 10000, convergence="")))
1100 d.addCallback(lambda ign:
1101 self.failUnless(self._has_happy_share_distribution()))
1104 # This scenario is basically comment:53, but changed so that the
1105 # Tahoe2ServerSelector sees the server with all of the shares before
1106 # any of the other servers.
1108 # server 2: shares 0 - 9
1112 # The Tahoe2ServerSelector sees the servers permuted as:
1114 # Note that server 0 has been replaced by server 4; this makes it
1115 # easier to ensure that the last server seen by Tahoe2ServerSelector
1116 # has only one share.
1117 d.addCallback(_change_basedir)
1118 d.addCallback(lambda ign:
1119 self._setup_and_upload())
1120 d.addCallback(lambda ign:
1121 self._add_server_with_share(server_number=2, share_number=0))
1122 d.addCallback(lambda ign:
1123 self._add_server_with_share(server_number=3, share_number=1))
1124 d.addCallback(lambda ign:
1125 self._add_server_with_share(server_number=1, share_number=2))
1126 # Copy all of the other shares to server number 2
1127 def _copy_shares(ign):
1128 for i in xrange(0, 10):
1129 self._copy_share_to_server(i, 2)
1130 d.addCallback(_copy_shares)
1131 # Remove the first server, and add a placeholder with share 0
1132 d.addCallback(lambda ign:
1133 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1134 d.addCallback(lambda ign:
1135 self._add_server_with_share(server_number=4, share_number=0))
1136 # Now try uploading.
1137 d.addCallback(_reset_encoding_parameters)
1138 d.addCallback(lambda client:
1139 client.upload(upload.Data("data" * 10000, convergence="")))
1140 d.addCallback(lambda ign:
1141 self.failUnless(self._has_happy_share_distribution()))
1144 # Try the same thing, but with empty servers after the first one
1145 # We want to make sure that Tahoe2ServerSelector will redistribute
1146 # shares as necessary, not simply discover an existing layout.
1148 # server 2: shares 0 - 9
1152 d.addCallback(_change_basedir)
1153 d.addCallback(lambda ign:
1154 self._setup_and_upload())
1155 d.addCallback(lambda ign:
1156 self._add_server(server_number=2))
1157 d.addCallback(lambda ign:
1158 self._add_server(server_number=3))
1159 d.addCallback(lambda ign:
1160 self._add_server(server_number=1))
1161 d.addCallback(lambda ign:
1162 self._add_server(server_number=4))
1163 d.addCallback(_copy_shares)
1164 d.addCallback(lambda ign:
1165 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1166 d.addCallback(_reset_encoding_parameters)
1167 d.addCallback(lambda client:
1168 client.upload(upload.Data("data" * 10000, convergence="")))
1169 # Make sure that only as many shares as necessary to satisfy
1170 # servers of happiness were pushed.
1171 d.addCallback(lambda results:
1172 self.failUnlessEqual(results.get_pushed_shares(), 3))
1173 d.addCallback(lambda ign:
1174 self.failUnless(self._has_happy_share_distribution()))
1177 def test_problem_layout_ticket_1124(self):
1178 self.basedir = self.mktemp()
1179 d = self._setup_and_upload(k=2, n=4)
1181 # server 0: shares 0, 1, 2, 3
1182 # server 1: shares 0, 3
1185 # With this layout, an upload should just be satisfied that the current distribution is good enough, right?
1187 self._add_server_with_share(server_number=0, share_number=None)
1188 self._add_server_with_share(server_number=1, share_number=0)
1189 self._add_server_with_share(server_number=2, share_number=1)
1190 self._add_server_with_share(server_number=3, share_number=2)
1192 self._copy_share_to_server(3, 1)
1193 client = self.g.clients[0]
1194 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1197 d.addCallback(_setup)
1198 d.addCallback(lambda client:
1199 client.upload(upload.Data("data" * 10000, convergence="")))
1200 d.addCallback(lambda ign:
1201 self.failUnless(self._has_happy_share_distribution()))
1203 test_problem_layout_ticket_1124.todo = "Fix this after 1.7.1 release."
1205 def test_happiness_with_some_readonly_servers(self):
1206 # Try the following layout
1207 # server 2: shares 0-9
1208 # server 4: share 0, read-only
1209 # server 3: share 1, read-only
1210 # server 1: share 2, read-only
1211 self.basedir = self.mktemp()
1212 d = self._setup_and_upload()
1213 d.addCallback(lambda ign:
1214 self._add_server_with_share(server_number=2, share_number=0))
1215 d.addCallback(lambda ign:
1216 self._add_server_with_share(server_number=3, share_number=1,
1218 d.addCallback(lambda ign:
1219 self._add_server_with_share(server_number=1, share_number=2,
1221 # Copy all of the other shares to server number 2
1222 def _copy_shares(ign):
1223 for i in xrange(1, 10):
1224 self._copy_share_to_server(i, 2)
1225 d.addCallback(_copy_shares)
1226 # Remove server 0, and add another in its place
1227 d.addCallback(lambda ign:
1228 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1229 d.addCallback(lambda ign:
1230 self._add_server_with_share(server_number=4, share_number=0,
1232 def _reset_encoding_parameters(ign, happy=4):
1233 client = self.g.clients[0]
1234 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1236 d.addCallback(_reset_encoding_parameters)
1237 d.addCallback(lambda client:
1238 client.upload(upload.Data("data" * 10000, convergence="")))
1239 d.addCallback(lambda ign:
1240 self.failUnless(self._has_happy_share_distribution()))
1244 def test_happiness_with_all_readonly_servers(self):
1245 # server 3: share 1, read-only
1246 # server 1: share 2, read-only
1247 # server 2: shares 0-9, read-only
1248 # server 4: share 0, read-only
1249 # The idea with this test is to make sure that the survey of
1250 # read-only servers doesn't undercount servers of happiness
1251 self.basedir = self.mktemp()
1252 d = self._setup_and_upload()
1253 d.addCallback(lambda ign:
1254 self._add_server_with_share(server_number=4, share_number=0,
1256 d.addCallback(lambda ign:
1257 self._add_server_with_share(server_number=3, share_number=1,
1259 d.addCallback(lambda ign:
1260 self._add_server_with_share(server_number=1, share_number=2,
1262 d.addCallback(lambda ign:
1263 self._add_server_with_share(server_number=2, share_number=0,
1265 def _copy_shares(ign):
1266 for i in xrange(1, 10):
1267 self._copy_share_to_server(i, 2)
1268 d.addCallback(_copy_shares)
1269 d.addCallback(lambda ign:
1270 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1271 def _reset_encoding_parameters(ign, happy=4):
1272 client = self.g.clients[0]
1273 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1275 d.addCallback(_reset_encoding_parameters)
1276 d.addCallback(lambda client:
1277 client.upload(upload.Data("data" * 10000, convergence="")))
1278 d.addCallback(lambda ign:
1279 self.failUnless(self._has_happy_share_distribution()))
1283 def test_dropped_servers_in_encoder(self):
1284 # The Encoder does its own "servers_of_happiness" check if it
1285 # happens to lose a bucket during an upload (it assumes that
1286 # the layout presented to it satisfies "servers_of_happiness"
1287 # until a failure occurs)
1289 # This test simulates an upload where servers break after server
1290 # selection, but before they are written to.
1291 def _set_basedir(ign=None):
1292 self.basedir = self.mktemp()
1294 d = self._setup_and_upload();
1296 def _do_server_setup(ign):
1297 self._add_server(server_number=1)
1298 self._add_server(server_number=2)
1299 self._add_server(server_number=3)
1300 self._add_server(server_number=4)
1301 self._add_server(server_number=5)
1302 d.addCallback(_do_server_setup)
1303 # remove the original server
1304 # (necessary to ensure that the Tahoe2ServerSelector will distribute
1306 def _remove_server(ign):
1307 server = self.g.servers_by_number[0]
1308 self.g.remove_server(server.my_nodeid)
1309 d.addCallback(_remove_server)
1310 # This should succeed; we still have 4 servers, and the
1311 # happiness of the upload is 4.
1312 d.addCallback(lambda ign:
1313 self._do_upload_with_broken_servers(1))
1314 # Now, do the same thing over again, but drop 2 servers instead
1315 # of 1. This should fail, because servers_of_happiness is 4 and
1316 # we can't satisfy that.
1317 d.addCallback(_set_basedir)
1318 d.addCallback(lambda ign:
1319 self._setup_and_upload())
1320 d.addCallback(_do_server_setup)
1321 d.addCallback(_remove_server)
1322 d.addCallback(lambda ign:
1323 self.shouldFail(UploadUnhappinessError,
1324 "test_dropped_servers_in_encoder",
1325 "shares could be placed on only 3 server(s) "
1326 "such that any 3 of them have enough shares to "
1327 "recover the file, but we were asked to place "
1328 "shares on at least 4",
1329 self._do_upload_with_broken_servers, 2))
1330 # Now do the same thing over again, but make some of the servers
1331 # readonly, break some of the ones that aren't, and make sure that
1332 # happiness accounting is preserved.
1333 d.addCallback(_set_basedir)
1334 d.addCallback(lambda ign:
1335 self._setup_and_upload())
1336 def _do_server_setup_2(ign):
1340 self._add_server_with_share(4, 7, readonly=True)
1341 self._add_server_with_share(5, 8, readonly=True)
1342 d.addCallback(_do_server_setup_2)
1343 d.addCallback(_remove_server)
1344 d.addCallback(lambda ign:
1345 self._do_upload_with_broken_servers(1))
1346 d.addCallback(_set_basedir)
1347 d.addCallback(lambda ign:
1348 self._setup_and_upload())
1349 d.addCallback(_do_server_setup_2)
1350 d.addCallback(_remove_server)
1351 d.addCallback(lambda ign:
1352 self.shouldFail(UploadUnhappinessError,
1353 "test_dropped_servers_in_encoder",
1354 "shares could be placed on only 3 server(s) "
1355 "such that any 3 of them have enough shares to "
1356 "recover the file, but we were asked to place "
1357 "shares on at least 4",
1358 self._do_upload_with_broken_servers, 2))
1362 def test_merge_servers(self):
1363 # merge_servers merges a list of upload_servers and a dict of
1364 # shareid -> serverid mappings.
1366 1 : set(["server1"]),
1367 2 : set(["server2"]),
1368 3 : set(["server3"]),
1369 4 : set(["server4", "server5"]),
1370 5 : set(["server1", "server2"]),
1372 # if not provided with a upload_servers argument, it should just
1373 # return the first argument unchanged.
1374 self.failUnlessEqual(shares, merge_servers(shares, set([])))
1376 for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1377 t = FakeServerTracker(server, [i])
1380 1 : set(["server1"]),
1381 2 : set(["server2"]),
1382 3 : set(["server3"]),
1383 4 : set(["server4", "server5"]),
1384 5 : set(["server1", "server2", "server5"]),
1385 6 : set(["server6"]),
1386 7 : set(["server7"]),
1387 8 : set(["server8"]),
1389 self.failUnlessEqual(expected, merge_servers(shares, set(trackers)))
1392 5 : set(["server5"]),
1393 6 : set(["server6"]),
1394 7 : set(["server7"]),
1395 8 : set(["server8"]),
1397 self.failUnlessEqual(expected, merge_servers(shares2, set(trackers)))
1401 for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1402 shares3[i] = set([server])
1403 t = FakeServerTracker(server, [i])
1405 expected[i] = set([server])
1406 self.failUnlessEqual(expected, merge_servers(shares3, set(trackers)))
1409 def test_servers_of_happiness_utility_function(self):
1410 # These tests are concerned with the servers_of_happiness()
1411 # utility function, and its underlying matching algorithm. Other
1412 # aspects of the servers_of_happiness behavior are tested
1413 # elsehwere These tests exist to ensure that
1414 # servers_of_happiness doesn't under or overcount the happiness
1415 # value for given inputs.
1417 # servers_of_happiness expects a dict of
1418 # shnum => set(serverids) as a preexisting shares argument.
1420 1 : set(["server1"]),
1421 2 : set(["server2"]),
1422 3 : set(["server3"]),
1423 4 : set(["server4"])
1425 happy = servers_of_happiness(test1)
1426 self.failUnlessEqual(4, happy)
1427 test1[4] = set(["server1"])
1428 # We've added a duplicate server, so now servers_of_happiness
1429 # should be 3 instead of 4.
1430 happy = servers_of_happiness(test1)
1431 self.failUnlessEqual(3, happy)
1432 # The second argument of merge_servers should be a set of objects with
1433 # serverid and buckets as attributes. In actual use, these will be
1434 # ServerTracker instances, but for testing it is fine to make a
1435 # FakeServerTracker whose job is to hold those instance variables to
1438 for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1439 t = FakeServerTracker(server, [i])
1441 # Recall that test1 is a server layout with servers_of_happiness
1442 # = 3. Since there isn't any overlap between the shnum ->
1443 # set([serverid]) correspondences in test1 and those in trackers,
1444 # the result here should be 7.
1445 test2 = merge_servers(test1, set(trackers))
1446 happy = servers_of_happiness(test2)
1447 self.failUnlessEqual(7, happy)
1448 # Now add an overlapping server to trackers. This is redundant,
1449 # so it should not cause the previously reported happiness value
1451 t = FakeServerTracker("server1", [1])
1453 test2 = merge_servers(test1, set(trackers))
1454 happy = servers_of_happiness(test2)
1455 self.failUnlessEqual(7, happy)
1457 happy = servers_of_happiness(test)
1458 self.failUnlessEqual(0, happy)
1459 # Test a more substantial overlap between the trackers and the
1460 # existing assignments.
1462 1 : set(['server1']),
1463 2 : set(['server2']),
1464 3 : set(['server3']),
1465 4 : set(['server4']),
1468 t = FakeServerTracker('server5', [4])
1470 t = FakeServerTracker('server6', [3, 5])
1472 # The value returned by servers_of_happiness is the size
1473 # of a maximum matching in the bipartite graph that
1474 # servers_of_happiness() makes between serverids and share
1475 # numbers. It should find something like this:
1476 # (server 1, share 1)
1477 # (server 2, share 2)
1478 # (server 3, share 3)
1479 # (server 5, share 4)
1480 # (server 6, share 5)
1482 # and, since there are 5 edges in this matching, it should
1484 test2 = merge_servers(test, set(trackers))
1485 happy = servers_of_happiness(test2)
1486 self.failUnlessEqual(5, happy)
1487 # Zooko's first puzzle:
1488 # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1490 # server 1: shares 0, 1
1491 # server 2: shares 1, 2
1494 # This should yield happiness of 3.
1496 0 : set(['server1']),
1497 1 : set(['server1', 'server2']),
1498 2 : set(['server2', 'server3']),
1500 self.failUnlessEqual(3, servers_of_happiness(test))
1501 # Zooko's second puzzle:
1502 # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1504 # server 1: shares 0, 1
1507 # This should yield happiness of 2.
1509 0 : set(['server1']),
1510 1 : set(['server1', 'server2']),
1512 self.failUnlessEqual(2, servers_of_happiness(test))
1515 def test_shares_by_server(self):
1516 test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1517 sbs = shares_by_server(test)
1518 self.failUnlessEqual(set([1]), sbs["server1"])
1519 self.failUnlessEqual(set([2]), sbs["server2"])
1520 self.failUnlessEqual(set([3]), sbs["server3"])
1521 self.failUnlessEqual(set([4]), sbs["server4"])
1523 1 : set(["server1"]),
1524 2 : set(["server1"]),
1525 3 : set(["server1"]),
1526 4 : set(["server2"]),
1527 5 : set(["server2"])
1529 sbs = shares_by_server(test1)
1530 self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1531 self.failUnlessEqual(set([4, 5]), sbs["server2"])
1532 # This should fail unless the serverid part of the mapping is a set
1533 test2 = {1: "server1"}
1534 self.shouldFail(AssertionError,
1535 "test_shares_by_server",
1537 shares_by_server, test2)
1540 def test_existing_share_detection(self):
1541 self.basedir = self.mktemp()
1542 d = self._setup_and_upload()
1543 # Our final setup should look like this:
1544 # server 1: shares 0 - 9, read-only
1548 # The purpose of this test is to make sure that the server selector
1549 # knows about the shares on server 1, even though it is read-only.
1550 # It used to simply filter these out, which would cause the test
1551 # to fail when servers_of_happiness = 4.
1552 d.addCallback(lambda ign:
1553 self._add_server_with_share(1, 0, True))
1554 d.addCallback(lambda ign:
1555 self._add_server(2))
1556 d.addCallback(lambda ign:
1557 self._add_server(3))
1558 d.addCallback(lambda ign:
1559 self._add_server(4))
1560 def _copy_shares(ign):
1561 for i in xrange(1, 10):
1562 self._copy_share_to_server(i, 1)
1563 d.addCallback(_copy_shares)
1564 d.addCallback(lambda ign:
1565 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1566 def _prepare_client(ign):
1567 client = self.g.clients[0]
1568 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1570 d.addCallback(_prepare_client)
1571 d.addCallback(lambda client:
1572 client.upload(upload.Data("data" * 10000, convergence="")))
1573 d.addCallback(lambda ign:
1574 self.failUnless(self._has_happy_share_distribution()))
1578 def test_query_counting(self):
1579 # If server selection fails, Tahoe2ServerSelector prints out a lot
1580 # of helpful diagnostic information, including query stats.
1581 # This test helps make sure that that information is accurate.
1582 self.basedir = self.mktemp()
1583 d = self._setup_and_upload()
1585 for i in xrange(1, 11):
1586 self._add_server(server_number=i)
1587 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1588 c = self.g.clients[0]
1589 # We set happy to an unsatisfiable value so that we can check the
1590 # counting in the exception message. The same progress message
1591 # is also used when the upload is successful, but in that case it
1592 # only gets written to a log, so we can't see what it says.
1593 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1595 d.addCallback(_setup)
1596 d.addCallback(lambda c:
1597 self.shouldFail(UploadUnhappinessError, "test_query_counting",
1598 "10 queries placed some shares",
1599 c.upload, upload.Data("data" * 10000,
1601 # Now try with some readonly servers. We want to make sure that
1602 # the readonly server share discovery phase is counted correctly.
1604 self.basedir = self.mktemp()
1606 d.addCallback(_reset)
1607 d.addCallback(lambda ign:
1608 self._setup_and_upload())
1610 for i in xrange(1, 11):
1611 self._add_server(server_number=i)
1612 self._add_server(server_number=11, readonly=True)
1613 self._add_server(server_number=12, readonly=True)
1614 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1615 c = self.g.clients[0]
1616 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1618 d.addCallback(_then)
1619 d.addCallback(lambda c:
1620 self.shouldFail(UploadUnhappinessError, "test_query_counting",
1621 "2 placed none (of which 2 placed none due to "
1622 "the server being full",
1623 c.upload, upload.Data("data" * 10000,
1625 # Now try the case where the upload process finds a bunch of the
1626 # shares that it wants to place on the first server, including
1627 # the one that it wanted to allocate there. Though no shares will
1628 # be allocated in this request, it should still be called
1629 # productive, since it caused some homeless shares to be
1631 d.addCallback(_reset)
1632 d.addCallback(lambda ign:
1633 self._setup_and_upload())
1636 for i in xrange(1, 11):
1637 self._add_server(server_number=i)
1638 # Copy all of the shares to server 9, since that will be
1639 # the first one that the selector sees.
1640 for i in xrange(10):
1641 self._copy_share_to_server(i, 9)
1642 # Remove server 0, and its contents
1643 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1644 # Make happiness unsatisfiable
1645 c = self.g.clients[0]
1646 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1648 d.addCallback(_next)
1649 d.addCallback(lambda c:
1650 self.shouldFail(UploadUnhappinessError, "test_query_counting",
1651 "1 queries placed some shares",
1652 c.upload, upload.Data("data" * 10000,
1657 def test_upper_limit_on_readonly_queries(self):
1658 self.basedir = self.mktemp()
1659 d = self._setup_and_upload()
1661 for i in xrange(1, 11):
1662 self._add_server(server_number=i, readonly=True)
1663 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1664 c = self.g.clients[0]
1665 c.DEFAULT_ENCODING_PARAMETERS['k'] = 2
1666 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1667 c.DEFAULT_ENCODING_PARAMETERS['n'] = 4
1669 d.addCallback(_then)
1670 d.addCallback(lambda client:
1671 self.shouldFail(UploadUnhappinessError,
1672 "test_upper_limit_on_readonly_queries",
1673 "sent 8 queries to 8 servers",
1675 upload.Data('data' * 10000, convergence="")))
1679 def test_exception_messages_during_server_selection(self):
1680 # server 1: read-only, no shares
1681 # server 2: read-only, no shares
1682 # server 3: read-only, no shares
1683 # server 4: read-only, no shares
1684 # server 5: read-only, no shares
1685 # This will fail, but we want to make sure that the log messages
1686 # are informative about why it has failed.
1687 self.basedir = self.mktemp()
1688 d = self._setup_and_upload()
1689 d.addCallback(lambda ign:
1690 self._add_server(server_number=1, readonly=True))
1691 d.addCallback(lambda ign:
1692 self._add_server(server_number=2, readonly=True))
1693 d.addCallback(lambda ign:
1694 self._add_server(server_number=3, readonly=True))
1695 d.addCallback(lambda ign:
1696 self._add_server(server_number=4, readonly=True))
1697 d.addCallback(lambda ign:
1698 self._add_server(server_number=5, readonly=True))
1699 d.addCallback(lambda ign:
1700 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1701 def _reset_encoding_parameters(ign, happy=4):
1702 client = self.g.clients[0]
1703 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1705 d.addCallback(_reset_encoding_parameters)
1706 d.addCallback(lambda client:
1707 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1708 "placed 0 shares out of 10 "
1709 "total (10 homeless), want to place shares on at "
1710 "least 4 servers such that any 3 of them have "
1711 "enough shares to recover the file, "
1712 "sent 5 queries to 5 servers, 0 queries placed "
1713 "some shares, 5 placed none "
1714 "(of which 5 placed none due to the server being "
1715 "full and 0 placed none due to an error)",
1717 upload.Data("data" * 10000, convergence="")))
1720 # server 1: read-only, no shares
1721 # server 2: broken, no shares
1722 # server 3: read-only, no shares
1723 # server 4: read-only, no shares
1724 # server 5: read-only, no shares
1726 self.basedir = self.mktemp()
1727 d.addCallback(_reset)
1728 d.addCallback(lambda ign:
1729 self._setup_and_upload())
1730 d.addCallback(lambda ign:
1731 self._add_server(server_number=1, readonly=True))
1732 d.addCallback(lambda ign:
1733 self._add_server(server_number=2))
1734 def _break_server_2(ign):
1735 serverid = self.g.servers_by_number[2].my_nodeid
1736 self.g.break_server(serverid)
1737 d.addCallback(_break_server_2)
1738 d.addCallback(lambda ign:
1739 self._add_server(server_number=3, readonly=True))
1740 d.addCallback(lambda ign:
1741 self._add_server(server_number=4, readonly=True))
1742 d.addCallback(lambda ign:
1743 self._add_server(server_number=5, readonly=True))
1744 d.addCallback(lambda ign:
1745 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1746 d.addCallback(_reset_encoding_parameters)
1747 d.addCallback(lambda client:
1748 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1749 "placed 0 shares out of 10 "
1750 "total (10 homeless), want to place shares on at "
1751 "least 4 servers such that any 3 of them have "
1752 "enough shares to recover the file, "
1753 "sent 5 queries to 5 servers, 0 queries placed "
1754 "some shares, 5 placed none "
1755 "(of which 4 placed none due to the server being "
1756 "full and 1 placed none due to an error)",
1758 upload.Data("data" * 10000, convergence="")))
1759 # server 0, server 1 = empty, accepting shares
1760 # This should place all of the shares, but still fail with happy=4.
1761 # We want to make sure that the exception message is worded correctly.
1762 d.addCallback(_reset)
1763 d.addCallback(lambda ign:
1765 d.addCallback(lambda ign:
1766 self._add_server(server_number=1))
1767 d.addCallback(_reset_encoding_parameters)
1768 d.addCallback(lambda client:
1769 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1770 "shares could be placed or found on only 2 "
1771 "server(s). We were asked to place shares on at "
1772 "least 4 server(s) such that any 3 of them have "
1773 "enough shares to recover the file.",
1774 client.upload, upload.Data("data" * 10000,
1776 # servers 0 - 4 = empty, accepting shares
1777 # This too should place all the shares, and this too should fail,
1778 # but since the effective happiness is more than the k encoding
1779 # parameter, it should trigger a different error message than the one
1781 d.addCallback(_reset)
1782 d.addCallback(lambda ign:
1784 d.addCallback(lambda ign:
1785 self._add_server(server_number=1))
1786 d.addCallback(lambda ign:
1787 self._add_server(server_number=2))
1788 d.addCallback(lambda ign:
1789 self._add_server(server_number=3))
1790 d.addCallback(lambda ign:
1791 self._add_server(server_number=4))
1792 d.addCallback(_reset_encoding_parameters, happy=7)
1793 d.addCallback(lambda client:
1794 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1795 "shares could be placed on only 5 server(s) such "
1796 "that any 3 of them have enough shares to recover "
1797 "the file, but we were asked to place shares on "
1798 "at least 7 such servers.",
1799 client.upload, upload.Data("data" * 10000,
1801 # server 0: shares 0 - 9
1802 # server 1: share 0, read-only
1803 # server 2: share 0, read-only
1804 # server 3: share 0, read-only
1805 # This should place all of the shares, but fail with happy=4.
1806 # Since the number of servers with shares is more than the number
1807 # necessary to reconstitute the file, this will trigger a different
1808 # error message than either of those above.
1809 d.addCallback(_reset)
1810 d.addCallback(lambda ign:
1811 self._setup_and_upload())
1812 d.addCallback(lambda ign:
1813 self._add_server_with_share(server_number=1, share_number=0,
1815 d.addCallback(lambda ign:
1816 self._add_server_with_share(server_number=2, share_number=0,
1818 d.addCallback(lambda ign:
1819 self._add_server_with_share(server_number=3, share_number=0,
1821 d.addCallback(_reset_encoding_parameters, happy=7)
1822 d.addCallback(lambda client:
1823 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1824 "shares could be placed or found on 4 server(s), "
1825 "but they are not spread out evenly enough to "
1826 "ensure that any 3 of these servers would have "
1827 "enough shares to recover the file. We were asked "
1828 "to place shares on at least 7 servers such that "
1829 "any 3 of them have enough shares to recover the "
1831 client.upload, upload.Data("data" * 10000,
1836 def test_problem_layout_comment_187(self):
1837 # #778 comment 187 broke an initial attempt at a share
1838 # redistribution algorithm. This test is here to demonstrate the
1839 # breakage, and to test that subsequent algorithms don't also
1840 # break in the same way.
1841 self.basedir = self.mktemp()
1842 d = self._setup_and_upload(k=2, n=3)
1844 # server 1: shares 0, 1, 2, readonly
1845 # server 2: share 0, readonly
1848 self._add_server_with_share(server_number=1, share_number=0,
1850 self._add_server_with_share(server_number=2, share_number=0,
1852 self._add_server_with_share(server_number=3, share_number=0)
1854 self._copy_share_to_server(1, 1)
1855 self._copy_share_to_server(2, 1)
1857 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1858 client = self.g.clients[0]
1859 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
1862 d.addCallback(_setup)
1863 d.addCallback(lambda client:
1864 client.upload(upload.Data("data" * 10000, convergence="")))
1865 d.addCallback(lambda ign:
1866 self.failUnless(self._has_happy_share_distribution()))
1868 test_problem_layout_comment_187.todo = "this isn't fixed yet"
1870 def test_problem_layout_ticket_1118(self):
1871 # #1118 includes a report from a user who hit an assertion in
1872 # the upload code with this layout.
1873 self.basedir = self.mktemp()
1874 d = self._setup_and_upload(k=2, n=4)
1876 # server 0: no shares
1877 # server 1: shares 0, 3
1880 # The order that they get queries is 0, 1, 3, 2
1882 self._add_server(server_number=0)
1883 self._add_server_with_share(server_number=1, share_number=0)
1884 self._add_server_with_share(server_number=2, share_number=2)
1885 self._add_server_with_share(server_number=3, share_number=1)
1887 self._copy_share_to_server(3, 1)
1888 storedir = self.get_serverdir(0)
1889 # remove the storedir, wiping out any existing shares
1890 shutil.rmtree(storedir)
1891 # create an empty storedir to replace the one we just removed
1893 client = self.g.clients[0]
1894 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1897 d.addCallback(_setup)
1898 # Note: actually it should succeed! See
1899 # test_problem_layout_ticket_1128. But ticket 1118 is just to
1900 # make it realize that it has failed, so if it raises
1901 # UploadUnhappinessError then we'll give it the green light
1903 d.addCallback(lambda ignored:
1904 self.shouldFail(UploadUnhappinessError,
1905 "test_problem_layout_ticket_1118",
1907 self.g.clients[0].upload, upload.Data("data" * 10000,
1911 def test_problem_layout_ticket_1128(self):
1912 # #1118 includes a report from a user who hit an assertion in
1913 # the upload code with this layout.
1914 self.basedir = self.mktemp()
1915 d = self._setup_and_upload(k=2, n=4)
1917 # server 0: no shares
1918 # server 1: shares 0, 3
1921 # The order that they get queries is 0, 1, 3, 2
1923 self._add_server(server_number=0)
1924 self._add_server_with_share(server_number=1, share_number=0)
1925 self._add_server_with_share(server_number=2, share_number=2)
1926 self._add_server_with_share(server_number=3, share_number=1)
1928 self._copy_share_to_server(3, 1)
1929 storedir = self.get_serverdir(0)
1930 # remove the storedir, wiping out any existing shares
1931 shutil.rmtree(storedir)
1932 # create an empty storedir to replace the one we just removed
1934 client = self.g.clients[0]
1935 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1938 d.addCallback(_setup)
1939 d.addCallback(lambda client:
1940 client.upload(upload.Data("data" * 10000, convergence="")))
1941 d.addCallback(lambda ign:
1942 self.failUnless(self._has_happy_share_distribution()))
1944 test_problem_layout_ticket_1128.todo = "Invent a smarter uploader that uploads successfully in this case."
1946 def test_upload_succeeds_with_some_homeless_shares(self):
1947 # If the upload is forced to stop trying to place shares before
1948 # it has placed (or otherwise accounted) for all of them, but it
1949 # has placed enough to satisfy the upload health criteria that
1950 # we're using, it should still succeed.
1951 self.basedir = self.mktemp()
1952 d = self._setup_and_upload()
1953 def _server_setup(ign):
1954 # Add four servers so that we have a layout like this:
1955 # server 1: share 0, read-only
1956 # server 2: share 1, read-only
1957 # server 3: share 2, read-only
1958 # server 4: share 3, read-only
1959 # If we set happy = 4, the upload will manage to satisfy
1960 # servers of happiness, but not place all of the shares; we
1961 # want to test that the upload is declared successful in
1963 self._add_server_with_share(server_number=1, share_number=0,
1965 self._add_server_with_share(server_number=2, share_number=1,
1967 self._add_server_with_share(server_number=3, share_number=2,
1969 self._add_server_with_share(server_number=4, share_number=3,
1972 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1973 # Set the client appropriately
1974 c = self.g.clients[0]
1975 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1977 d.addCallback(_server_setup)
1978 d.addCallback(lambda client:
1979 client.upload(upload.Data("data" * 10000, convergence="")))
1980 d.addCallback(lambda ign:
1981 self.failUnless(self._has_happy_share_distribution()))
1985 def test_uploader_skips_over_servers_with_only_one_share(self):
1986 # We want to make sure that the redistribution logic ignores
1987 # servers with only one share, since placing these shares
1988 # elsewhere will at best keep happiness the same as it was, and
1990 self.basedir = self.mktemp()
1991 d = self._setup_and_upload()
1992 def _server_setup(ign):
1993 # Add some servers so that the upload will need to
1994 # redistribute, but will first pass over a couple of servers
1995 # that don't have enough shares to redistribute before
1996 # finding one that does have shares to redistribute.
1997 self._add_server_with_share(server_number=1, share_number=0)
1998 self._add_server_with_share(server_number=2, share_number=2)
1999 self._add_server_with_share(server_number=3, share_number=1)
2000 self._add_server_with_share(server_number=8, share_number=4)
2001 self._add_server_with_share(server_number=5, share_number=5)
2002 self._add_server_with_share(server_number=10, share_number=7)
2004 self._copy_share_to_server(i, 2)
2005 return self.g.clients[0]
2006 d.addCallback(_server_setup)
2007 d.addCallback(lambda client:
2008 client.upload(upload.Data("data" * 10000, convergence="")))
2009 d.addCallback(lambda ign:
2010 self.failUnless(self._has_happy_share_distribution()))
2014 def test_server_selector_bucket_abort(self):
2015 # If server selection for an upload fails due to an unhappy
2016 # layout, the server selection process should abort the buckets it
2017 # allocates before failing, so that the space can be re-used.
2018 self.basedir = self.mktemp()
2019 self.set_up_grid(num_servers=5)
2021 # Try to upload a file with happy=7, which is unsatisfiable with
2022 # the current grid. This will fail, but should not take up any
2023 # space on the storage servers after it fails.
2024 client = self.g.clients[0]
2025 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2026 d = defer.succeed(None)
2027 d.addCallback(lambda ignored:
2028 self.shouldFail(UploadUnhappinessError,
2029 "test_server_selection_bucket_abort",
2031 client.upload, upload.Data("data" * 10000,
2033 # wait for the abort messages to get there.
2034 def _turn_barrier(res):
2035 return fireEventually(res)
2036 d.addCallback(_turn_barrier)
2038 for server in self.g.servers_by_number.values():
2039 self.failUnlessEqual(server.allocated_size(), 0)
2040 d.addCallback(_then)
2044 def test_encoder_bucket_abort(self):
2045 # If enough servers die in the process of encoding and uploading
2046 # a file to make the layout unhappy, we should cancel the
2047 # newly-allocated buckets before dying.
2048 self.basedir = self.mktemp()
2049 self.set_up_grid(num_servers=4)
2051 client = self.g.clients[0]
2052 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2054 d = defer.succeed(None)
2055 d.addCallback(lambda ignored:
2056 self.shouldFail(UploadUnhappinessError,
2057 "test_encoder_bucket_abort",
2059 self._do_upload_with_broken_servers, 1))
2060 def _turn_barrier(res):
2061 return fireEventually(res)
2062 d.addCallback(_turn_barrier)
2064 for server in self.g.servers_by_number.values():
2065 self.failUnlessEqual(server.allocated_size(), 0)
2066 d.addCallback(_then)
2070 def _set_up_nodes_extra_config(self, clientdir):
2071 cfgfn = os.path.join(clientdir, "tahoe.cfg")
2072 oldcfg = open(cfgfn, "r").read()
2073 f = open(cfgfn, "wt")
2076 f.write("[client]\n")
2077 f.write("shares.needed = 7\n")
2078 f.write("shares.total = 12\n")
2084 # upload with exactly 75 servers (shares_of_happiness)
2085 # have a download fail
2086 # cancel a download (need to implement more cancel stuff)
2089 # NoNetworkGrid, upload part of ciphertext, kill server, continue upload
2090 # check with Kevan, they want to live in test_upload, existing tests might cover
2091 # def test_lost_one_shareholder(self): # these are upload-side tests
2092 # def test_lost_one_shareholder_early(self):
2093 # def test_lost_many_shareholders(self):
2094 # def test_lost_all_shareholders(self):