2 from cStringIO import StringIO
3 from twisted.trial import unittest
4 from twisted.python.failure import Failure
5 from twisted.internet import defer
6 from foolscap.api import fireEventually
8 import allmydata # for __full_version__
9 from allmydata import uri, monitor, client
10 from allmydata.immutable import upload, encode
11 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
12 from allmydata.util.assertutil import precondition
13 from allmydata.util.deferredutil import DeferredListShouldSucceed
14 from allmydata.test.no_network import GridTestMixin
15 from allmydata.test.common_util import ShouldFailMixin
16 from allmydata.util.happinessutil import servers_of_happiness, \
17 shares_by_server, merge_peers
18 from allmydata.storage_client import StorageFarmBroker
19 from allmydata.storage.server import storage_index_to_dir
23 def extract_uri(results):
26 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
27 # have been due to an earlier test ERROR'ing out due to timeout, which seems
28 # to screw up subsequent tests.
31 class Uploadable(unittest.TestCase):
32 def shouldEqual(self, data, expected):
33 self.failUnless(isinstance(data, list))
35 self.failUnless(isinstance(e, str))
37 self.failUnlessEqual(s, expected)
39 def test_filehandle_random_key(self):
40 return self._test_filehandle(convergence=None)
42 def test_filehandle_convergent_encryption(self):
43 return self._test_filehandle(convergence="some convergence string")
45 def _test_filehandle(self, convergence):
47 u = upload.FileHandle(s, convergence=convergence)
49 d.addCallback(self.failUnlessEqual, 41)
50 d.addCallback(lambda res: u.read(1))
51 d.addCallback(self.shouldEqual, "a")
52 d.addCallback(lambda res: u.read(80))
53 d.addCallback(self.shouldEqual, "a"*40)
54 d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
55 d.addCallback(lambda res: s.close()) # that privilege is reserved for us
58 def test_filename(self):
59 basedir = "upload/Uploadable/test_filename"
61 fn = os.path.join(basedir, "file")
65 u = upload.FileName(fn, convergence=None)
67 d.addCallback(self.failUnlessEqual, 41)
68 d.addCallback(lambda res: u.read(1))
69 d.addCallback(self.shouldEqual, "a")
70 d.addCallback(lambda res: u.read(80))
71 d.addCallback(self.shouldEqual, "a"*40)
72 d.addCallback(lambda res: u.close())
77 u = upload.Data(s, convergence=None)
79 d.addCallback(self.failUnlessEqual, 41)
80 d.addCallback(lambda res: u.read(1))
81 d.addCallback(self.shouldEqual, "a")
82 d.addCallback(lambda res: u.read(80))
83 d.addCallback(self.shouldEqual, "a"*40)
84 d.addCallback(lambda res: u.close())
87 class ServerError(Exception):
91 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
95 "max_segment_size": max_segsize,
97 self.node.DEFAULT_ENCODING_PARAMETERS = p
99 class FakeStorageServer:
100 def __init__(self, mode):
104 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
105 { "maximum-immutable-share-size": 2**32 },
106 "application-version": str(allmydata.__full_version__),
109 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
110 { "maximum-immutable-share-size": 10 },
111 "application-version": str(allmydata.__full_version__),
115 def callRemote(self, methname, *args, **kwargs):
117 meth = getattr(self, methname)
118 return meth(*args, **kwargs)
120 d.addCallback(lambda res: _call())
123 def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
124 sharenums, share_size, canary):
125 #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
126 if self.mode == "first-fail":
127 if self.queries == 0:
129 if self.mode == "second-fail":
130 if self.queries == 1:
133 if self.mode == "full":
135 elif self.mode == "already got them":
136 return (set(sharenums), {},)
138 for shnum in sharenums:
139 self.allocated.append( (storage_index, shnum) )
141 dict([( shnum, FakeBucketWriter(share_size) )
142 for shnum in sharenums]),
145 class FakeBucketWriter:
146 # a diagnostic version of storageserver.BucketWriter
147 def __init__(self, size):
148 self.data = StringIO()
152 def callRemote(self, methname, *args, **kwargs):
154 meth = getattr(self, "remote_" + methname)
155 return meth(*args, **kwargs)
157 d.addCallback(lambda res: _call())
161 def callRemoteOnly(self, methname, *args, **kwargs):
162 d = self.callRemote(methname, *args, **kwargs)
163 del d # callRemoteOnly ignores this
167 def remote_write(self, offset, data):
168 precondition(not self.closed)
169 precondition(offset >= 0)
170 precondition(offset+len(data) <= self._size,
171 "offset=%d + data=%d > size=%d" %
172 (offset, len(data), self._size))
173 self.data.seek(offset)
174 self.data.write(data)
176 def remote_close(self):
177 precondition(not self.closed)
180 def remote_abort(self):
184 DEFAULT_ENCODING_PARAMETERS = {"k":25,
187 "max_segment_size": 1*MiB,
189 def __init__(self, mode="good", num_servers=50):
190 self.num_servers = num_servers
191 if type(mode) is str:
192 mode = dict([i,mode] for i in range(num_servers))
193 peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
194 for fakeid in range(self.num_servers) ]
195 self.storage_broker = StorageFarmBroker(None, permute_peers=True)
196 for (serverid, server) in peers:
197 self.storage_broker.test_add_server(serverid, server)
198 self.last_peers = [p[1] for p in peers]
200 def log(self, *args, **kwargs):
202 def get_encoding_parameters(self):
203 return self.DEFAULT_ENCODING_PARAMETERS
204 def get_storage_broker(self):
205 return self.storage_broker
206 _secret_holder = client.SecretHolder("lease secret", "convergence secret")
208 class GotTooFarError(Exception):
211 class GiganticUploadable(upload.FileHandle):
212 def __init__(self, size):
216 def get_encryption_key(self):
217 return defer.succeed("\x00" * 16)
219 return defer.succeed(self._size)
220 def read(self, length):
221 left = self._size - self._fp
222 length = min(left, length)
224 if self._fp > 1000000:
225 # terminate the test early.
226 raise GotTooFarError("we shouldn't be allowed to get this far")
227 return defer.succeed(["\x00" * length])
232 Once upon a time, there was a beautiful princess named Buttercup. She lived
233 in a magical land where every file was stored securely among millions of
234 machines, and nobody ever worried about their data being lost ever again.
237 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
241 SIZE_LARGE = len(DATA)
243 def upload_data(uploader, data):
244 u = upload.Data(data, convergence=None)
245 return uploader.upload(u)
246 def upload_filename(uploader, filename):
247 u = upload.FileName(filename, convergence=None)
248 return uploader.upload(u)
249 def upload_filehandle(uploader, fh):
250 u = upload.FileHandle(fh, convergence=None)
251 return uploader.upload(u)
253 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
255 self.node = FakeClient(mode="good")
256 self.u = upload.Uploader()
257 self.u.running = True
258 self.u.parent = self.node
260 def _check_small(self, newuri, size):
261 u = uri.from_string(newuri)
262 self.failUnless(isinstance(u, uri.LiteralFileURI))
263 self.failUnlessEqual(len(u.data), size)
265 def _check_large(self, newuri, size):
266 u = uri.from_string(newuri)
267 self.failUnless(isinstance(u, uri.CHKFileURI))
268 self.failUnless(isinstance(u.get_storage_index(), str))
269 self.failUnlessEqual(len(u.get_storage_index()), 16)
270 self.failUnless(isinstance(u.key, str))
271 self.failUnlessEqual(len(u.key), 16)
272 self.failUnlessEqual(u.size, size)
274 def get_data(self, size):
277 def test_too_large(self):
278 # we've removed the 4GiB share size limit (see ticket #346 for
279 # details), but still have an 8-byte field, so the limit is now
280 # 2**64, so make sure we reject files larger than that.
281 k = 3; happy = 7; n = 10
282 self.set_encoding_parameters(k, happy, n)
284 data1 = GiganticUploadable(big)
285 d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
286 "This file is too large to be uploaded (data_size)",
287 self.u.upload, data1)
288 data2 = GiganticUploadable(big-3)
289 d.addCallback(lambda res:
290 self.shouldFail(FileTooLargeError,
291 "test_too_large-data2",
292 "This file is too large to be uploaded (offsets)",
293 self.u.upload, data2))
294 # I don't know where the actual limit is.. it depends upon how large
295 # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
298 def test_data_zero(self):
299 data = self.get_data(SIZE_ZERO)
300 d = upload_data(self.u, data)
301 d.addCallback(extract_uri)
302 d.addCallback(self._check_small, SIZE_ZERO)
305 def test_data_small(self):
306 data = self.get_data(SIZE_SMALL)
307 d = upload_data(self.u, data)
308 d.addCallback(extract_uri)
309 d.addCallback(self._check_small, SIZE_SMALL)
312 def test_data_large(self):
313 data = self.get_data(SIZE_LARGE)
314 d = upload_data(self.u, data)
315 d.addCallback(extract_uri)
316 d.addCallback(self._check_large, SIZE_LARGE)
319 def test_data_large_odd_segments(self):
320 data = self.get_data(SIZE_LARGE)
321 segsize = int(SIZE_LARGE / 2.5)
322 # we want 3 segments, since that's not a power of two
323 self.set_encoding_parameters(25, 25, 100, segsize)
324 d = upload_data(self.u, data)
325 d.addCallback(extract_uri)
326 d.addCallback(self._check_large, SIZE_LARGE)
329 def test_filehandle_zero(self):
330 data = self.get_data(SIZE_ZERO)
331 d = upload_filehandle(self.u, StringIO(data))
332 d.addCallback(extract_uri)
333 d.addCallback(self._check_small, SIZE_ZERO)
336 def test_filehandle_small(self):
337 data = self.get_data(SIZE_SMALL)
338 d = upload_filehandle(self.u, StringIO(data))
339 d.addCallback(extract_uri)
340 d.addCallback(self._check_small, SIZE_SMALL)
343 def test_filehandle_large(self):
344 data = self.get_data(SIZE_LARGE)
345 d = upload_filehandle(self.u, StringIO(data))
346 d.addCallback(extract_uri)
347 d.addCallback(self._check_large, SIZE_LARGE)
350 def test_filename_zero(self):
351 fn = "Uploader-test_filename_zero.data"
353 data = self.get_data(SIZE_ZERO)
356 d = upload_filename(self.u, fn)
357 d.addCallback(extract_uri)
358 d.addCallback(self._check_small, SIZE_ZERO)
361 def test_filename_small(self):
362 fn = "Uploader-test_filename_small.data"
364 data = self.get_data(SIZE_SMALL)
367 d = upload_filename(self.u, fn)
368 d.addCallback(extract_uri)
369 d.addCallback(self._check_small, SIZE_SMALL)
372 def test_filename_large(self):
373 fn = "Uploader-test_filename_large.data"
375 data = self.get_data(SIZE_LARGE)
378 d = upload_filename(self.u, fn)
379 d.addCallback(extract_uri)
380 d.addCallback(self._check_large, SIZE_LARGE)
383 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
384 def make_node(self, mode, num_servers=10):
385 self.node = FakeClient(mode, num_servers)
386 self.u = upload.Uploader()
387 self.u.running = True
388 self.u.parent = self.node
390 def _check_large(self, newuri, size):
391 u = uri.from_string(newuri)
392 self.failUnless(isinstance(u, uri.CHKFileURI))
393 self.failUnless(isinstance(u.get_storage_index(), str))
394 self.failUnlessEqual(len(u.get_storage_index()), 16)
395 self.failUnless(isinstance(u.key, str))
396 self.failUnlessEqual(len(u.key), 16)
397 self.failUnlessEqual(u.size, size)
399 def test_first_error(self):
400 mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
402 self.set_encoding_parameters(k=25, happy=1, n=50)
403 d = upload_data(self.u, DATA)
404 d.addCallback(extract_uri)
405 d.addCallback(self._check_large, SIZE_LARGE)
408 def test_first_error_all(self):
409 self.make_node("first-fail")
410 d = self.shouldFail(UploadUnhappinessError, "first_error_all",
411 "peer selection failed",
412 upload_data, self.u, DATA)
414 self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
415 # there should also be a 'last failure was' message
416 self.failUnlessIn("ServerError", str(f.value))
417 d.addCallback(_check)
420 def test_second_error(self):
421 # we want to make sure we make it to a third pass. This means that
422 # the first pass was insufficient to place all shares, and at least
423 # one of second pass servers (other than the last one) accepted a
424 # share (so we'll believe that a third pass will be useful). (if
425 # everyone but the last server throws an error, then we'll send all
426 # the remaining shares to the last server at the end of the second
427 # pass, and if that succeeds, we won't make it to a third pass).
429 # we can achieve this 97.5% of the time by using 40 servers, having
430 # 39 of them fail on the second request, leaving only one to succeed
431 # on the second request. (we need to keep the number of servers low
432 # enough to ensure a second pass with 100 shares).
433 mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
434 self.make_node(mode, 40)
435 d = upload_data(self.u, DATA)
436 d.addCallback(extract_uri)
437 d.addCallback(self._check_large, SIZE_LARGE)
440 def test_second_error_all(self):
441 self.make_node("second-fail")
442 d = self.shouldFail(UploadUnhappinessError, "second_error_all",
443 "peer selection failed",
444 upload_data, self.u, DATA)
446 self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
447 # there should also be a 'last failure was' message
448 self.failUnlessIn("ServerError", str(f.value))
449 d.addCallback(_check)
452 class FullServer(unittest.TestCase):
454 self.node = FakeClient(mode="full")
455 self.u = upload.Uploader()
456 self.u.running = True
457 self.u.parent = self.node
459 def _should_fail(self, f):
460 self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
462 def test_data_large(self):
464 d = upload_data(self.u, data)
465 d.addBoth(self._should_fail)
468 class PeerSelection(unittest.TestCase):
470 def make_client(self, num_servers=50):
471 self.node = FakeClient(mode="good", num_servers=num_servers)
472 self.u = upload.Uploader()
473 self.u.running = True
474 self.u.parent = self.node
476 def get_data(self, size):
479 def _check_large(self, newuri, size):
480 u = uri.from_string(newuri)
481 self.failUnless(isinstance(u, uri.CHKFileURI))
482 self.failUnless(isinstance(u.get_storage_index(), str))
483 self.failUnlessEqual(len(u.get_storage_index()), 16)
484 self.failUnless(isinstance(u.key, str))
485 self.failUnlessEqual(len(u.key), 16)
486 self.failUnlessEqual(u.size, size)
488 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
492 "max_segment_size": max_segsize,
494 self.node.DEFAULT_ENCODING_PARAMETERS = p
496 def test_one_each(self):
497 # if we have 50 shares, and there are 50 peers, and they all accept a
498 # share, we should get exactly one share per peer
501 data = self.get_data(SIZE_LARGE)
502 self.set_encoding_parameters(25, 30, 50)
503 d = upload_data(self.u, data)
504 d.addCallback(extract_uri)
505 d.addCallback(self._check_large, SIZE_LARGE)
507 for p in self.node.last_peers:
508 allocated = p.allocated
509 self.failUnlessEqual(len(allocated), 1)
510 self.failUnlessEqual(p.queries, 1)
511 d.addCallback(_check)
514 def test_two_each(self):
515 # if we have 100 shares, and there are 50 peers, and they all accept
516 # all shares, we should get exactly two shares per peer
519 data = self.get_data(SIZE_LARGE)
520 # if there are 50 peers, then happy needs to be <= 50
521 self.set_encoding_parameters(50, 50, 100)
522 d = upload_data(self.u, data)
523 d.addCallback(extract_uri)
524 d.addCallback(self._check_large, SIZE_LARGE)
526 for p in self.node.last_peers:
527 allocated = p.allocated
528 self.failUnlessEqual(len(allocated), 2)
529 self.failUnlessEqual(p.queries, 2)
530 d.addCallback(_check)
533 def test_one_each_plus_one_extra(self):
534 # if we have 51 shares, and there are 50 peers, then one peer gets
535 # two shares and the rest get just one
538 data = self.get_data(SIZE_LARGE)
539 self.set_encoding_parameters(24, 41, 51)
540 d = upload_data(self.u, data)
541 d.addCallback(extract_uri)
542 d.addCallback(self._check_large, SIZE_LARGE)
546 for p in self.node.last_peers:
547 allocated = p.allocated
548 self.failUnless(len(allocated) in (1,2), len(allocated))
549 if len(allocated) == 1:
550 self.failUnlessEqual(p.queries, 1)
553 self.failUnlessEqual(p.queries, 2)
555 self.failUnlessEqual(len(got_one), 49)
556 self.failUnlessEqual(len(got_two), 1)
557 d.addCallback(_check)
560 def test_four_each(self):
561 # if we have 200 shares, and there are 50 peers, then each peer gets
562 # 4 shares. The design goal is to accomplish this with only two
566 data = self.get_data(SIZE_LARGE)
567 # if there are 50 peers, then happy should be no more than 50 if
568 # we want this to work.
569 self.set_encoding_parameters(100, 50, 200)
570 d = upload_data(self.u, data)
571 d.addCallback(extract_uri)
572 d.addCallback(self._check_large, SIZE_LARGE)
574 for p in self.node.last_peers:
575 allocated = p.allocated
576 self.failUnlessEqual(len(allocated), 4)
577 self.failUnlessEqual(p.queries, 2)
578 d.addCallback(_check)
581 def test_three_of_ten(self):
582 # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
586 data = self.get_data(SIZE_LARGE)
587 self.set_encoding_parameters(3, 3, 10)
588 d = upload_data(self.u, data)
589 d.addCallback(extract_uri)
590 d.addCallback(self._check_large, SIZE_LARGE)
593 for p in self.node.last_peers:
594 allocated = p.allocated
595 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
596 histogram = [counts.get(i, 0) for i in range(5)]
597 self.failUnlessEqual(histogram, [0,0,0,2,1])
598 d.addCallback(_check)
601 def test_some_big_some_small(self):
602 # 10 shares, 20 servers, but half the servers don't support a
603 # share-size large enough for our file
604 mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
605 self.node = FakeClient(mode, num_servers=20)
606 self.u = upload.Uploader()
607 self.u.running = True
608 self.u.parent = self.node
610 data = self.get_data(SIZE_LARGE)
611 self.set_encoding_parameters(3, 5, 10)
612 d = upload_data(self.u, data)
613 d.addCallback(extract_uri)
614 d.addCallback(self._check_large, SIZE_LARGE)
616 # we should have put one share each on the big peers, and zero
617 # shares on the small peers
619 for p in self.node.last_peers:
621 self.failUnlessEqual(len(p.allocated), 1)
622 elif p.mode == "small":
623 self.failUnlessEqual(len(p.allocated), 0)
624 total_allocated += len(p.allocated)
625 self.failUnlessEqual(total_allocated, 10)
626 d.addCallback(_check)
630 class StorageIndex(unittest.TestCase):
631 def test_params_must_matter(self):
632 DATA = "I am some data"
633 u = upload.Data(DATA, convergence="")
634 eu = upload.EncryptAnUploadable(u)
635 d1 = eu.get_storage_index()
637 # CHK means the same data should encrypt the same way
638 u = upload.Data(DATA, convergence="")
639 eu = upload.EncryptAnUploadable(u)
640 d1a = eu.get_storage_index()
642 # but if we use a different convergence string it should be different
643 u = upload.Data(DATA, convergence="wheee!")
644 eu = upload.EncryptAnUploadable(u)
645 d1salt1 = eu.get_storage_index()
647 # and if we add yet a different convergence it should be different again
648 u = upload.Data(DATA, convergence="NOT wheee!")
649 eu = upload.EncryptAnUploadable(u)
650 d1salt2 = eu.get_storage_index()
652 # and if we use the first string again it should be the same as last time
653 u = upload.Data(DATA, convergence="wheee!")
654 eu = upload.EncryptAnUploadable(u)
655 d1salt1a = eu.get_storage_index()
657 # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
658 u = upload.Data(DATA, convergence="")
659 u.encoding_param_k = u.default_encoding_param_k + 1
660 eu = upload.EncryptAnUploadable(u)
661 d2 = eu.get_storage_index()
663 # and if we use a random key, it should be different than the CHK
664 u = upload.Data(DATA, convergence=None)
665 eu = upload.EncryptAnUploadable(u)
666 d3 = eu.get_storage_index()
667 # and different from another instance
668 u = upload.Data(DATA, convergence=None)
669 eu = upload.EncryptAnUploadable(u)
670 d4 = eu.get_storage_index()
672 d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
674 si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
675 self.failUnlessEqual(si1, si1a)
676 self.failIfEqual(si1, si2)
677 self.failIfEqual(si1, si3)
678 self.failIfEqual(si1, si4)
679 self.failIfEqual(si3, si4)
680 self.failIfEqual(si1salt1, si1)
681 self.failIfEqual(si1salt1, si1salt2)
682 self.failIfEqual(si1salt2, si1)
683 self.failUnlessEqual(si1salt1, si1salt1a)
687 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
689 def _do_upload_with_broken_servers(self, servers_to_break):
691 I act like a normal upload, but before I send the results of
692 Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
693 PeerTrackers in the used_peers part of the return result.
695 assert self.g, "I tried to find a grid at self.g, but failed"
696 broker = self.g.clients[0].storage_broker
697 sh = self.g.clients[0]._secret_holder
698 data = upload.Data("data" * 10000, convergence="")
699 data.encoding_param_k = 3
700 data.encoding_param_happy = 4
701 data.encoding_param_n = 10
702 uploadable = upload.EncryptAnUploadable(data)
703 encoder = encode.Encoder()
704 encoder.set_encrypted_uploadable(uploadable)
705 status = upload.UploadStatus()
706 selector = upload.Tahoe2PeerSelector("dglev", "test", status)
707 storage_index = encoder.get_param("storage_index")
708 share_size = encoder.get_param("share_size")
709 block_size = encoder.get_param("block_size")
710 num_segments = encoder.get_param("num_segments")
711 d = selector.get_shareholders(broker, sh, storage_index,
712 share_size, block_size, num_segments,
714 def _have_shareholders((used_peers, already_peers)):
715 assert servers_to_break <= len(used_peers)
716 for index in xrange(servers_to_break):
717 server = list(used_peers)[index]
718 for share in server.buckets.keys():
719 server.buckets[share].abort()
721 servermap = already_peers.copy()
722 for peer in used_peers:
723 buckets.update(peer.buckets)
724 for bucket in peer.buckets:
725 servermap.setdefault(bucket, set()).add(peer.peerid)
726 encoder.set_shareholders(buckets, servermap)
729 d.addCallback(_have_shareholders)
733 def _add_server(self, server_number, readonly=False):
734 assert self.g, "I tried to find a grid at self.g, but failed"
735 ss = self.g.make_server(server_number, readonly)
736 self.g.add_server(server_number, ss)
739 def _add_server_with_share(self, server_number, share_number=None,
741 self._add_server(server_number, readonly)
742 if share_number is not None:
743 self._copy_share_to_server(share_number, server_number)
746 def _copy_share_to_server(self, share_number, server_number):
747 ss = self.g.servers_by_number[server_number]
748 # Copy share i from the directory associated with the first
749 # storage server to the directory associated with this one.
750 assert self.g, "I tried to find a grid at self.g, but failed"
751 assert self.shares, "I tried to find shares at self.shares, but failed"
752 old_share_location = self.shares[share_number][2]
753 new_share_location = os.path.join(ss.storedir, "shares")
754 si = uri.from_string(self.uri).get_storage_index()
755 new_share_location = os.path.join(new_share_location,
756 storage_index_to_dir(si))
757 if not os.path.exists(new_share_location):
758 os.makedirs(new_share_location)
759 new_share_location = os.path.join(new_share_location,
761 if old_share_location != new_share_location:
762 shutil.copy(old_share_location, new_share_location)
763 shares = self.find_shares(self.uri)
764 # Make sure that the storage server has the share.
765 self.failUnless((share_number, ss.my_nodeid, new_share_location)
768 def _setup_grid(self):
770 I set up a NoNetworkGrid with a single server and client.
772 self.set_up_grid(num_clients=1, num_servers=1)
774 def _setup_and_upload(self, **kwargs):
776 I set up a NoNetworkGrid with a single server and client,
777 upload a file to it, store its uri in self.uri, and store its
778 sharedata in self.shares.
781 client = self.g.clients[0]
782 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
783 if "n" in kwargs and "k" in kwargs:
784 client.DEFAULT_ENCODING_PARAMETERS['k'] = kwargs['k']
785 client.DEFAULT_ENCODING_PARAMETERS['n'] = kwargs['n']
786 data = upload.Data("data" * 10000, convergence="")
788 d = client.upload(data)
791 d.addCallback(_store_uri)
792 d.addCallback(lambda ign:
793 self.find_shares(self.uri))
794 def _store_shares(shares):
796 d.addCallback(_store_shares)
800 def test_configure_parameters(self):
801 self.basedir = self.mktemp()
802 hooks = {0: self._set_up_nodes_extra_config}
803 self.set_up_grid(client_config_hooks=hooks)
804 c0 = self.g.clients[0]
807 u = upload.Data(DATA, convergence="")
809 d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
810 m = monitor.Monitor()
811 d.addCallback(lambda fn: fn.check(m))
814 self.failUnlessEqual(data["count-shares-needed"], 7)
815 self.failUnlessEqual(data["count-shares-expected"], 12)
816 d.addCallback(_check)
820 def _setUp(self, ns):
821 # Used by test_happy_semantics and test_preexisting_share_behavior
822 # to set up the grid.
823 self.node = FakeClient(mode="good", num_servers=ns)
824 self.u = upload.Uploader()
825 self.u.running = True
826 self.u.parent = self.node
829 def test_happy_semantics(self):
831 DATA = upload.Data("kittens" * 10000, convergence="")
832 # These parameters are unsatisfiable with only 2 servers.
833 self.set_encoding_parameters(k=3, happy=5, n=10)
834 d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
835 "shares could be placed or found on only 2 "
836 "server(s). We were asked to place shares on "
837 "at least 5 server(s) such that any 3 of them "
838 "have enough shares to recover the file",
840 # Let's reset the client to have 10 servers
841 d.addCallback(lambda ign:
843 # These parameters are satisfiable with 10 servers.
844 d.addCallback(lambda ign:
845 self.set_encoding_parameters(k=3, happy=5, n=10))
846 d.addCallback(lambda ign:
848 # Let's reset the client to have 7 servers
849 # (this is less than n, but more than h)
850 d.addCallback(lambda ign:
852 # These parameters are satisfiable with 7 servers.
853 d.addCallback(lambda ign:
854 self.set_encoding_parameters(k=3, happy=5, n=10))
855 d.addCallback(lambda ign:
859 def test_aborted_shares(self):
860 self.basedir = "upload/EncodingParameters/aborted_shares"
861 self.set_up_grid(num_servers=4)
862 c = self.g.clients[0]
863 DATA = upload.Data(100* "kittens", convergence="")
864 # These parameters are unsatisfiable with only 4 servers, but should
865 # work with 5, as long as the original 4 are not stuck in the open
866 # BucketWriter state (open() but not
867 parms = {"k":2, "happy":5, "n":5, "max_segment_size": 1*MiB}
868 c.DEFAULT_ENCODING_PARAMETERS = parms
869 d = self.shouldFail(UploadUnhappinessError, "test_aborted_shares",
870 "shares could be placed on only 4 "
871 "server(s) such that any 2 of them have enough "
872 "shares to recover the file, but we were asked "
873 "to place shares on at least 5 such servers",
875 # now add the 5th server
876 d.addCallback(lambda ign: self._add_server(4, False))
877 # and this time the upload ought to succeed
878 d.addCallback(lambda ign: c.upload(DATA))
882 def test_problem_layout_comment_52(self):
884 self.basedir = self.mktemp()
886 # This scenario is at
887 # http://allmydata.org/trac/tahoe/ticket/778#comment:52
889 # The scenario in comment:52 proposes that we have a layout
891 # server 0: shares 1 - 9
892 # server 1: share 0, read-only
893 # server 2: share 0, read-only
894 # server 3: share 0, read-only
895 # To get access to the shares, we will first upload to one
896 # server, which will then have shares 0 - 9. We'll then
897 # add three new servers, configure them to not accept any new
898 # shares, then write share 0 directly into the serverdir of each,
899 # and then remove share 0 from server 0 in the same way.
900 # Then each of servers 1 - 3 will report that they have share 0,
901 # and will not accept any new share, while server 0 will report that
902 # it has shares 1 - 9 and will accept new shares.
903 # We'll then set 'happy' = 4, and see that an upload fails
905 d = self._setup_and_upload()
906 d.addCallback(lambda ign:
907 self._add_server_with_share(server_number=1, share_number=0,
909 d.addCallback(lambda ign:
910 self._add_server_with_share(server_number=2, share_number=0,
912 d.addCallback(lambda ign:
913 self._add_server_with_share(server_number=3, share_number=0,
915 # Remove the first share from server 0.
916 def _remove_share_0_from_server_0():
917 share_location = self.shares[0][2]
918 os.remove(share_location)
919 d.addCallback(lambda ign:
920 _remove_share_0_from_server_0())
921 # Set happy = 4 in the client.
923 client = self.g.clients[0]
924 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
926 d.addCallback(lambda ign:
928 # Uploading data should fail
929 d.addCallback(lambda client:
930 self.shouldFail(UploadUnhappinessError,
931 "test_problem_layout_comment_52_test_1",
932 "shares could be placed or found on 4 server(s), "
933 "but they are not spread out evenly enough to "
934 "ensure that any 3 of these servers would have "
935 "enough shares to recover the file. "
936 "We were asked to place shares on at "
937 "least 4 servers such that any 3 of them have "
938 "enough shares to recover the file",
939 client.upload, upload.Data("data" * 10000,
942 # Do comment:52, but like this:
944 # server 3: share 0, read-only
945 # server 1: share 0, read-only
946 # server 0: shares 0-9
947 d.addCallback(lambda ign:
949 d.addCallback(lambda ign:
950 self._setup_and_upload())
951 d.addCallback(lambda ign:
952 self._add_server(server_number=2))
953 d.addCallback(lambda ign:
954 self._add_server_with_share(server_number=3, share_number=0,
956 d.addCallback(lambda ign:
957 self._add_server_with_share(server_number=1, share_number=0,
960 client = self.g.clients[0]
961 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
963 d.addCallback(lambda ign:
965 d.addCallback(lambda client:
966 self.shouldFail(UploadUnhappinessError,
967 "test_problem_layout_comment_52_test_2",
968 "shares could be placed on only 3 server(s) such "
969 "that any 3 of them have enough shares to recover "
970 "the file, but we were asked to place shares on "
971 "at least 4 such servers.",
972 client.upload, upload.Data("data" * 10000,
977 def test_problem_layout_comment_53(self):
978 # This scenario is at
979 # http://allmydata.org/trac/tahoe/ticket/778#comment:53
981 # Set up the grid to have one server
982 def _change_basedir(ign):
983 self.basedir = self.mktemp()
984 _change_basedir(None)
985 # We start by uploading all of the shares to one server.
986 # Next, we'll add three new servers to our NoNetworkGrid. We'll add
987 # one share from our initial upload to each of these.
988 # The counterintuitive ordering of the share numbers is to deal with
989 # the permuting of these servers -- distributing the shares this
990 # way ensures that the Tahoe2PeerSelector sees them in the order
992 d = self._setup_and_upload()
993 d.addCallback(lambda ign:
994 self._add_server_with_share(server_number=1, share_number=2))
995 d.addCallback(lambda ign:
996 self._add_server_with_share(server_number=2, share_number=0))
997 d.addCallback(lambda ign:
998 self._add_server_with_share(server_number=3, share_number=1))
999 # So, we now have the following layout:
1000 # server 0: shares 0 - 9
1004 # We change the 'happy' parameter in the client to 4.
1005 # The Tahoe2PeerSelector will see the peers permuted as:
1007 # Ideally, a reupload of our original data should work.
1008 def _reset_encoding_parameters(ign, happy=4):
1009 client = self.g.clients[0]
1010 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1012 d.addCallback(_reset_encoding_parameters)
1013 d.addCallback(lambda client:
1014 client.upload(upload.Data("data" * 10000, convergence="")))
1017 # This scenario is basically comment:53, but changed so that the
1018 # Tahoe2PeerSelector sees the server with all of the shares before
1019 # any of the other servers.
1021 # server 2: shares 0 - 9
1025 # The Tahoe2PeerSelector sees the peers permuted as:
1027 # Note that server 0 has been replaced by server 4; this makes it
1028 # easier to ensure that the last server seen by Tahoe2PeerSelector
1029 # has only one share.
1030 d.addCallback(_change_basedir)
1031 d.addCallback(lambda ign:
1032 self._setup_and_upload())
1033 d.addCallback(lambda ign:
1034 self._add_server_with_share(server_number=2, share_number=0))
1035 d.addCallback(lambda ign:
1036 self._add_server_with_share(server_number=3, share_number=1))
1037 d.addCallback(lambda ign:
1038 self._add_server_with_share(server_number=1, share_number=2))
1039 # Copy all of the other shares to server number 2
1040 def _copy_shares(ign):
1041 for i in xrange(0, 10):
1042 self._copy_share_to_server(i, 2)
1043 d.addCallback(_copy_shares)
1044 # Remove the first server, and add a placeholder with share 0
1045 d.addCallback(lambda ign:
1046 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1047 d.addCallback(lambda ign:
1048 self._add_server_with_share(server_number=4, share_number=0))
1049 # Now try uploading.
1050 d.addCallback(_reset_encoding_parameters)
1051 d.addCallback(lambda client:
1052 client.upload(upload.Data("data" * 10000, convergence="")))
1055 # Try the same thing, but with empty servers after the first one
1056 # We want to make sure that Tahoe2PeerSelector will redistribute
1057 # shares as necessary, not simply discover an existing layout.
1059 # server 2: shares 0 - 9
1063 d.addCallback(_change_basedir)
1064 d.addCallback(lambda ign:
1065 self._setup_and_upload())
1066 d.addCallback(lambda ign:
1067 self._add_server(server_number=2))
1068 d.addCallback(lambda ign:
1069 self._add_server(server_number=3))
1070 d.addCallback(lambda ign:
1071 self._add_server(server_number=1))
1072 d.addCallback(lambda ign:
1073 self._add_server(server_number=4))
1074 d.addCallback(_copy_shares)
1075 d.addCallback(lambda ign:
1076 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1077 d.addCallback(_reset_encoding_parameters)
1078 d.addCallback(lambda client:
1079 client.upload(upload.Data("data" * 10000, convergence="")))
1080 # Make sure that only as many shares as necessary to satisfy
1081 # servers of happiness were pushed.
1082 d.addCallback(lambda results:
1083 self.failUnlessEqual(results.pushed_shares, 3))
1086 def test_problem_layout_ticket1124(self):
1087 self.basedir = self.mktemp()
1088 d = self._setup_and_upload(k=2, n=4)
1090 # server 0: shares 0, 1, 2, 3
1091 # server 1: shares 0, 3
1094 # With this layout, an upload should just be satisfied that the current distribution is good enough, right?
1096 self._add_server_with_share(server_number=0, share_number=None)
1097 self._add_server_with_share(server_number=1, share_number=0)
1098 self._add_server_with_share(server_number=2, share_number=1)
1099 self._add_server_with_share(server_number=3, share_number=2)
1101 self._copy_share_to_server(3, 1)
1102 client = self.g.clients[0]
1103 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1106 d.addCallback(_setup)
1107 d.addCallback(lambda client:
1108 client.upload(upload.Data("data" * 10000, convergence="")))
1110 test_problem_layout_ticket1124.todo = "Fix this after 1.7.1 release."
1112 def test_happiness_with_some_readonly_peers(self):
1113 # Try the following layout
1114 # server 2: shares 0-9
1115 # server 4: share 0, read-only
1116 # server 3: share 1, read-only
1117 # server 1: share 2, read-only
1118 self.basedir = self.mktemp()
1119 d = self._setup_and_upload()
1120 d.addCallback(lambda ign:
1121 self._add_server_with_share(server_number=2, share_number=0))
1122 d.addCallback(lambda ign:
1123 self._add_server_with_share(server_number=3, share_number=1,
1125 d.addCallback(lambda ign:
1126 self._add_server_with_share(server_number=1, share_number=2,
1128 # Copy all of the other shares to server number 2
1129 def _copy_shares(ign):
1130 for i in xrange(1, 10):
1131 self._copy_share_to_server(i, 2)
1132 d.addCallback(_copy_shares)
1133 # Remove server 0, and add another in its place
1134 d.addCallback(lambda ign:
1135 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1136 d.addCallback(lambda ign:
1137 self._add_server_with_share(server_number=4, share_number=0,
1139 def _reset_encoding_parameters(ign, happy=4):
1140 client = self.g.clients[0]
1141 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1143 d.addCallback(_reset_encoding_parameters)
1144 d.addCallback(lambda client:
1145 client.upload(upload.Data("data" * 10000, convergence="")))
1149 def test_happiness_with_all_readonly_peers(self):
1150 # server 3: share 1, read-only
1151 # server 1: share 2, read-only
1152 # server 2: shares 0-9, read-only
1153 # server 4: share 0, read-only
1154 # The idea with this test is to make sure that the survey of
1155 # read-only peers doesn't undercount servers of happiness
1156 self.basedir = self.mktemp()
1157 d = self._setup_and_upload()
1158 d.addCallback(lambda ign:
1159 self._add_server_with_share(server_number=4, share_number=0,
1161 d.addCallback(lambda ign:
1162 self._add_server_with_share(server_number=3, share_number=1,
1164 d.addCallback(lambda ign:
1165 self._add_server_with_share(server_number=1, share_number=2,
1167 d.addCallback(lambda ign:
1168 self._add_server_with_share(server_number=2, share_number=0,
1170 def _copy_shares(ign):
1171 for i in xrange(1, 10):
1172 self._copy_share_to_server(i, 2)
1173 d.addCallback(_copy_shares)
1174 d.addCallback(lambda ign:
1175 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1176 def _reset_encoding_parameters(ign, happy=4):
1177 client = self.g.clients[0]
1178 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1180 d.addCallback(_reset_encoding_parameters)
1181 d.addCallback(lambda client:
1182 client.upload(upload.Data("data" * 10000, convergence="")))
1186 def test_dropped_servers_in_encoder(self):
1187 # The Encoder does its own "servers_of_happiness" check if it
1188 # happens to lose a bucket during an upload (it assumes that
1189 # the layout presented to it satisfies "servers_of_happiness"
1190 # until a failure occurs)
1192 # This test simulates an upload where servers break after peer
1193 # selection, but before they are written to.
1194 def _set_basedir(ign=None):
1195 self.basedir = self.mktemp()
1197 d = self._setup_and_upload();
1199 def _do_server_setup(ign):
1200 self._add_server(server_number=1)
1201 self._add_server(server_number=2)
1202 self._add_server(server_number=3)
1203 self._add_server(server_number=4)
1204 self._add_server(server_number=5)
1205 d.addCallback(_do_server_setup)
1206 # remove the original server
1207 # (necessary to ensure that the Tahoe2PeerSelector will distribute
1209 def _remove_server(ign):
1210 server = self.g.servers_by_number[0]
1211 self.g.remove_server(server.my_nodeid)
1212 d.addCallback(_remove_server)
1213 # This should succeed; we still have 4 servers, and the
1214 # happiness of the upload is 4.
1215 d.addCallback(lambda ign:
1216 self._do_upload_with_broken_servers(1))
1217 # Now, do the same thing over again, but drop 2 servers instead
1218 # of 1. This should fail, because servers_of_happiness is 4 and
1219 # we can't satisfy that.
1220 d.addCallback(_set_basedir)
1221 d.addCallback(lambda ign:
1222 self._setup_and_upload())
1223 d.addCallback(_do_server_setup)
1224 d.addCallback(_remove_server)
1225 d.addCallback(lambda ign:
1226 self.shouldFail(UploadUnhappinessError,
1227 "test_dropped_servers_in_encoder",
1228 "shares could be placed on only 3 server(s) "
1229 "such that any 3 of them have enough shares to "
1230 "recover the file, but we were asked to place "
1231 "shares on at least 4",
1232 self._do_upload_with_broken_servers, 2))
1233 # Now do the same thing over again, but make some of the servers
1234 # readonly, break some of the ones that aren't, and make sure that
1235 # happiness accounting is preserved.
1236 d.addCallback(_set_basedir)
1237 d.addCallback(lambda ign:
1238 self._setup_and_upload())
1239 def _do_server_setup_2(ign):
1243 self._add_server_with_share(4, 7, readonly=True)
1244 self._add_server_with_share(5, 8, readonly=True)
1245 d.addCallback(_do_server_setup_2)
1246 d.addCallback(_remove_server)
1247 d.addCallback(lambda ign:
1248 self._do_upload_with_broken_servers(1))
1249 d.addCallback(_set_basedir)
1250 d.addCallback(lambda ign:
1251 self._setup_and_upload())
1252 d.addCallback(_do_server_setup_2)
1253 d.addCallback(_remove_server)
1254 d.addCallback(lambda ign:
1255 self.shouldFail(UploadUnhappinessError,
1256 "test_dropped_servers_in_encoder",
1257 "shares could be placed on only 3 server(s) "
1258 "such that any 3 of them have enough shares to "
1259 "recover the file, but we were asked to place "
1260 "shares on at least 4",
1261 self._do_upload_with_broken_servers, 2))
1265 def test_merge_peers(self):
1266 # merge_peers merges a list of used_peers and a dict of
1267 # shareid -> peerid mappings.
1269 1 : set(["server1"]),
1270 2 : set(["server2"]),
1271 3 : set(["server3"]),
1272 4 : set(["server4", "server5"]),
1273 5 : set(["server1", "server2"]),
1275 # if not provided with a used_peers argument, it should just
1276 # return the first argument unchanged.
1277 self.failUnlessEqual(shares, merge_peers(shares, set([])))
1278 class FakePeerTracker:
1281 for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1282 t = FakePeerTracker()
1287 1 : set(["server1"]),
1288 2 : set(["server2"]),
1289 3 : set(["server3"]),
1290 4 : set(["server4", "server5"]),
1291 5 : set(["server1", "server2", "server5"]),
1292 6 : set(["server6"]),
1293 7 : set(["server7"]),
1294 8 : set(["server8"]),
1296 self.failUnlessEqual(expected, merge_peers(shares, set(trackers)))
1299 5 : set(["server5"]),
1300 6 : set(["server6"]),
1301 7 : set(["server7"]),
1302 8 : set(["server8"]),
1304 self.failUnlessEqual(expected, merge_peers(shares2, set(trackers)))
1308 for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1309 shares3[i] = set([server])
1310 t = FakePeerTracker()
1314 expected[i] = set([server])
1315 self.failUnlessEqual(expected, merge_peers(shares3, set(trackers)))
1318 def test_servers_of_happiness_utility_function(self):
1319 # These tests are concerned with the servers_of_happiness()
1320 # utility function, and its underlying matching algorithm. Other
1321 # aspects of the servers_of_happiness behavior are tested
1322 # elsehwere These tests exist to ensure that
1323 # servers_of_happiness doesn't under or overcount the happiness
1324 # value for given inputs.
1326 # servers_of_happiness expects a dict of
1327 # shnum => set(peerids) as a preexisting shares argument.
1329 1 : set(["server1"]),
1330 2 : set(["server2"]),
1331 3 : set(["server3"]),
1332 4 : set(["server4"])
1334 happy = servers_of_happiness(test1)
1335 self.failUnlessEqual(4, happy)
1336 test1[4] = set(["server1"])
1337 # We've added a duplicate server, so now servers_of_happiness
1338 # should be 3 instead of 4.
1339 happy = servers_of_happiness(test1)
1340 self.failUnlessEqual(3, happy)
1341 # The second argument of merge_peers should be a set of
1342 # objects with peerid and buckets as attributes. In actual use,
1343 # these will be PeerTracker instances, but for testing it is fine
1344 # to make a FakePeerTracker whose job is to hold those instance
1345 # variables to test that part.
1346 class FakePeerTracker:
1349 for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1350 t = FakePeerTracker()
1354 # Recall that test1 is a server layout with servers_of_happiness
1355 # = 3. Since there isn't any overlap between the shnum ->
1356 # set([peerid]) correspondences in test1 and those in trackers,
1357 # the result here should be 7.
1358 test2 = merge_peers(test1, set(trackers))
1359 happy = servers_of_happiness(test2)
1360 self.failUnlessEqual(7, happy)
1361 # Now add an overlapping server to trackers. This is redundant,
1362 # so it should not cause the previously reported happiness value
1364 t = FakePeerTracker()
1365 t.peerid = "server1"
1368 test2 = merge_peers(test1, set(trackers))
1369 happy = servers_of_happiness(test2)
1370 self.failUnlessEqual(7, happy)
1372 happy = servers_of_happiness(test)
1373 self.failUnlessEqual(0, happy)
1374 # Test a more substantial overlap between the trackers and the
1375 # existing assignments.
1377 1 : set(['server1']),
1378 2 : set(['server2']),
1379 3 : set(['server3']),
1380 4 : set(['server4']),
1383 t = FakePeerTracker()
1384 t.peerid = 'server5'
1387 t = FakePeerTracker()
1388 t.peerid = 'server6'
1391 # The value returned by servers_of_happiness is the size
1392 # of a maximum matching in the bipartite graph that
1393 # servers_of_happiness() makes between peerids and share
1394 # numbers. It should find something like this:
1395 # (server 1, share 1)
1396 # (server 2, share 2)
1397 # (server 3, share 3)
1398 # (server 5, share 4)
1399 # (server 6, share 5)
1401 # and, since there are 5 edges in this matching, it should
1403 test2 = merge_peers(test, set(trackers))
1404 happy = servers_of_happiness(test2)
1405 self.failUnlessEqual(5, happy)
1406 # Zooko's first puzzle:
1407 # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1409 # server 1: shares 0, 1
1410 # server 2: shares 1, 2
1413 # This should yield happiness of 3.
1415 0 : set(['server1']),
1416 1 : set(['server1', 'server2']),
1417 2 : set(['server2', 'server3']),
1419 self.failUnlessEqual(3, servers_of_happiness(test))
1420 # Zooko's second puzzle:
1421 # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1423 # server 1: shares 0, 1
1426 # This should yield happiness of 2.
1428 0 : set(['server1']),
1429 1 : set(['server1', 'server2']),
1431 self.failUnlessEqual(2, servers_of_happiness(test))
1434 def test_shares_by_server(self):
1435 test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1436 sbs = shares_by_server(test)
1437 self.failUnlessEqual(set([1]), sbs["server1"])
1438 self.failUnlessEqual(set([2]), sbs["server2"])
1439 self.failUnlessEqual(set([3]), sbs["server3"])
1440 self.failUnlessEqual(set([4]), sbs["server4"])
1442 1 : set(["server1"]),
1443 2 : set(["server1"]),
1444 3 : set(["server1"]),
1445 4 : set(["server2"]),
1446 5 : set(["server2"])
1448 sbs = shares_by_server(test1)
1449 self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1450 self.failUnlessEqual(set([4, 5]), sbs["server2"])
1451 # This should fail unless the peerid part of the mapping is a set
1452 test2 = {1: "server1"}
1453 self.shouldFail(AssertionError,
1454 "test_shares_by_server",
1456 shares_by_server, test2)
1459 def test_existing_share_detection(self):
1460 self.basedir = self.mktemp()
1461 d = self._setup_and_upload()
1462 # Our final setup should look like this:
1463 # server 1: shares 0 - 9, read-only
1467 # The purpose of this test is to make sure that the peer selector
1468 # knows about the shares on server 1, even though it is read-only.
1469 # It used to simply filter these out, which would cause the test
1470 # to fail when servers_of_happiness = 4.
1471 d.addCallback(lambda ign:
1472 self._add_server_with_share(1, 0, True))
1473 d.addCallback(lambda ign:
1474 self._add_server(2))
1475 d.addCallback(lambda ign:
1476 self._add_server(3))
1477 d.addCallback(lambda ign:
1478 self._add_server(4))
1479 def _copy_shares(ign):
1480 for i in xrange(1, 10):
1481 self._copy_share_to_server(i, 1)
1482 d.addCallback(_copy_shares)
1483 d.addCallback(lambda ign:
1484 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1485 def _prepare_client(ign):
1486 client = self.g.clients[0]
1487 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1489 d.addCallback(_prepare_client)
1490 d.addCallback(lambda client:
1491 client.upload(upload.Data("data" * 10000, convergence="")))
1495 def test_query_counting(self):
1496 # If peer selection fails, Tahoe2PeerSelector prints out a lot
1497 # of helpful diagnostic information, including query stats.
1498 # This test helps make sure that that information is accurate.
1499 self.basedir = self.mktemp()
1500 d = self._setup_and_upload()
1502 for i in xrange(1, 11):
1503 self._add_server(server_number=i)
1504 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1505 c = self.g.clients[0]
1506 # We set happy to an unsatisfiable value so that we can check the
1507 # counting in the exception message. The same progress message
1508 # is also used when the upload is successful, but in that case it
1509 # only gets written to a log, so we can't see what it says.
1510 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1512 d.addCallback(_setup)
1513 d.addCallback(lambda c:
1514 self.shouldFail(UploadUnhappinessError, "test_query_counting",
1515 "10 queries placed some shares",
1516 c.upload, upload.Data("data" * 10000,
1518 # Now try with some readonly servers. We want to make sure that
1519 # the readonly peer share discovery phase is counted correctly.
1521 self.basedir = self.mktemp()
1523 d.addCallback(_reset)
1524 d.addCallback(lambda ign:
1525 self._setup_and_upload())
1527 for i in xrange(1, 11):
1528 self._add_server(server_number=i)
1529 self._add_server(server_number=11, readonly=True)
1530 self._add_server(server_number=12, readonly=True)
1531 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1532 c = self.g.clients[0]
1533 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1535 d.addCallback(_then)
1536 d.addCallback(lambda c:
1537 self.shouldFail(UploadUnhappinessError, "test_query_counting",
1538 "2 placed none (of which 2 placed none due to "
1539 "the server being full",
1540 c.upload, upload.Data("data" * 10000,
1542 # Now try the case where the upload process finds a bunch of the
1543 # shares that it wants to place on the first server, including
1544 # the one that it wanted to allocate there. Though no shares will
1545 # be allocated in this request, it should still be called
1546 # productive, since it caused some homeless shares to be
1548 d.addCallback(_reset)
1549 d.addCallback(lambda ign:
1550 self._setup_and_upload())
1553 for i in xrange(1, 11):
1554 self._add_server(server_number=i)
1555 # Copy all of the shares to server 9, since that will be
1556 # the first one that the selector sees.
1557 for i in xrange(10):
1558 self._copy_share_to_server(i, 9)
1559 # Remove server 0, and its contents
1560 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1561 # Make happiness unsatisfiable
1562 c = self.g.clients[0]
1563 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1565 d.addCallback(_next)
1566 d.addCallback(lambda c:
1567 self.shouldFail(UploadUnhappinessError, "test_query_counting",
1568 "1 queries placed some shares",
1569 c.upload, upload.Data("data" * 10000,
1574 def test_upper_limit_on_readonly_queries(self):
1575 self.basedir = self.mktemp()
1576 d = self._setup_and_upload()
1578 for i in xrange(1, 11):
1579 self._add_server(server_number=i, readonly=True)
1580 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1581 c = self.g.clients[0]
1582 c.DEFAULT_ENCODING_PARAMETERS['k'] = 2
1583 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1584 c.DEFAULT_ENCODING_PARAMETERS['n'] = 4
1586 d.addCallback(_then)
1587 d.addCallback(lambda client:
1588 self.shouldFail(UploadUnhappinessError,
1589 "test_upper_limit_on_readonly_queries",
1590 "sent 8 queries to 8 peers",
1592 upload.Data('data' * 10000, convergence="")))
1596 def test_exception_messages_during_peer_selection(self):
1597 # server 1: read-only, no shares
1598 # server 2: read-only, no shares
1599 # server 3: read-only, no shares
1600 # server 4: read-only, no shares
1601 # server 5: read-only, no shares
1602 # This will fail, but we want to make sure that the log messages
1603 # are informative about why it has failed.
1604 self.basedir = self.mktemp()
1605 d = self._setup_and_upload()
1606 d.addCallback(lambda ign:
1607 self._add_server(server_number=1, readonly=True))
1608 d.addCallback(lambda ign:
1609 self._add_server(server_number=2, readonly=True))
1610 d.addCallback(lambda ign:
1611 self._add_server(server_number=3, readonly=True))
1612 d.addCallback(lambda ign:
1613 self._add_server(server_number=4, readonly=True))
1614 d.addCallback(lambda ign:
1615 self._add_server(server_number=5, readonly=True))
1616 d.addCallback(lambda ign:
1617 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1618 def _reset_encoding_parameters(ign, happy=4):
1619 client = self.g.clients[0]
1620 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1622 d.addCallback(_reset_encoding_parameters)
1623 d.addCallback(lambda client:
1624 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1625 "placed 0 shares out of 10 "
1626 "total (10 homeless), want to place shares on at "
1627 "least 4 servers such that any 3 of them have "
1628 "enough shares to recover the file, "
1629 "sent 5 queries to 5 peers, 0 queries placed "
1630 "some shares, 5 placed none "
1631 "(of which 5 placed none due to the server being "
1632 "full and 0 placed none due to an error)",
1634 upload.Data("data" * 10000, convergence="")))
1637 # server 1: read-only, no shares
1638 # server 2: broken, no shares
1639 # server 3: read-only, no shares
1640 # server 4: read-only, no shares
1641 # server 5: read-only, no shares
1643 self.basedir = self.mktemp()
1644 d.addCallback(_reset)
1645 d.addCallback(lambda ign:
1646 self._setup_and_upload())
1647 d.addCallback(lambda ign:
1648 self._add_server(server_number=1, readonly=True))
1649 d.addCallback(lambda ign:
1650 self._add_server(server_number=2))
1651 def _break_server_2(ign):
1652 server = self.g.servers_by_number[2].my_nodeid
1653 # We have to break the server in servers_by_id,
1654 # because the one in servers_by_number isn't wrapped,
1655 # and doesn't look at its broken attribute when answering
1657 self.g.servers_by_id[server].broken = True
1658 d.addCallback(_break_server_2)
1659 d.addCallback(lambda ign:
1660 self._add_server(server_number=3, readonly=True))
1661 d.addCallback(lambda ign:
1662 self._add_server(server_number=4, readonly=True))
1663 d.addCallback(lambda ign:
1664 self._add_server(server_number=5, readonly=True))
1665 d.addCallback(lambda ign:
1666 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1667 d.addCallback(_reset_encoding_parameters)
1668 d.addCallback(lambda client:
1669 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1670 "placed 0 shares out of 10 "
1671 "total (10 homeless), want to place shares on at "
1672 "least 4 servers such that any 3 of them have "
1673 "enough shares to recover the file, "
1674 "sent 5 queries to 5 peers, 0 queries placed "
1675 "some shares, 5 placed none "
1676 "(of which 4 placed none due to the server being "
1677 "full and 1 placed none due to an error)",
1679 upload.Data("data" * 10000, convergence="")))
1680 # server 0, server 1 = empty, accepting shares
1681 # This should place all of the shares, but still fail with happy=4.
1682 # We want to make sure that the exception message is worded correctly.
1683 d.addCallback(_reset)
1684 d.addCallback(lambda ign:
1686 d.addCallback(lambda ign:
1687 self._add_server(server_number=1))
1688 d.addCallback(_reset_encoding_parameters)
1689 d.addCallback(lambda client:
1690 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1691 "shares could be placed or found on only 2 "
1692 "server(s). We were asked to place shares on at "
1693 "least 4 server(s) such that any 3 of them have "
1694 "enough shares to recover the file.",
1695 client.upload, upload.Data("data" * 10000,
1697 # servers 0 - 4 = empty, accepting shares
1698 # This too should place all the shares, and this too should fail,
1699 # but since the effective happiness is more than the k encoding
1700 # parameter, it should trigger a different error message than the one
1702 d.addCallback(_reset)
1703 d.addCallback(lambda ign:
1705 d.addCallback(lambda ign:
1706 self._add_server(server_number=1))
1707 d.addCallback(lambda ign:
1708 self._add_server(server_number=2))
1709 d.addCallback(lambda ign:
1710 self._add_server(server_number=3))
1711 d.addCallback(lambda ign:
1712 self._add_server(server_number=4))
1713 d.addCallback(_reset_encoding_parameters, happy=7)
1714 d.addCallback(lambda client:
1715 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1716 "shares could be placed on only 5 server(s) such "
1717 "that any 3 of them have enough shares to recover "
1718 "the file, but we were asked to place shares on "
1719 "at least 7 such servers.",
1720 client.upload, upload.Data("data" * 10000,
1722 # server 0: shares 0 - 9
1723 # server 1: share 0, read-only
1724 # server 2: share 0, read-only
1725 # server 3: share 0, read-only
1726 # This should place all of the shares, but fail with happy=4.
1727 # Since the number of servers with shares is more than the number
1728 # necessary to reconstitute the file, this will trigger a different
1729 # error message than either of those above.
1730 d.addCallback(_reset)
1731 d.addCallback(lambda ign:
1732 self._setup_and_upload())
1733 d.addCallback(lambda ign:
1734 self._add_server_with_share(server_number=1, share_number=0,
1736 d.addCallback(lambda ign:
1737 self._add_server_with_share(server_number=2, share_number=0,
1739 d.addCallback(lambda ign:
1740 self._add_server_with_share(server_number=3, share_number=0,
1742 d.addCallback(_reset_encoding_parameters, happy=7)
1743 d.addCallback(lambda client:
1744 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1745 "shares could be placed or found on 4 server(s), "
1746 "but they are not spread out evenly enough to "
1747 "ensure that any 3 of these servers would have "
1748 "enough shares to recover the file. We were asked "
1749 "to place shares on at least 7 servers such that "
1750 "any 3 of them have enough shares to recover the "
1752 client.upload, upload.Data("data" * 10000,
1757 def test_problem_layout_comment_187(self):
1758 # #778 comment 187 broke an initial attempt at a share
1759 # redistribution algorithm. This test is here to demonstrate the
1760 # breakage, and to test that subsequent algorithms don't also
1761 # break in the same way.
1762 self.basedir = self.mktemp()
1763 d = self._setup_and_upload(k=2, n=3)
1765 # server 1: shares 0, 1, 2, readonly
1766 # server 2: share 0, readonly
1769 self._add_server_with_share(server_number=1, share_number=0,
1771 self._add_server_with_share(server_number=2, share_number=0,
1773 self._add_server_with_share(server_number=3, share_number=0)
1775 self._copy_share_to_server(1, 1)
1776 self._copy_share_to_server(2, 1)
1778 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1779 client = self.g.clients[0]
1780 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
1783 d.addCallback(_setup)
1784 d.addCallback(lambda client:
1785 client.upload(upload.Data("data" * 10000, convergence="")))
1787 test_problem_layout_comment_187.todo = "this isn't fixed yet"
1789 def test_problem_layout_ticket_1118(self):
1790 # #1118 includes a report from a user who hit an assertion in
1791 # the upload code with this layout.
1792 self.basedir = self.mktemp()
1793 d = self._setup_and_upload(k=2, n=4)
1795 # server 0: no shares
1796 # server 1: shares 0, 3
1799 # The order that they get queries is 0, 1, 3, 2
1801 self._add_server(server_number=0)
1802 self._add_server_with_share(server_number=1, share_number=0)
1803 self._add_server_with_share(server_number=2, share_number=2)
1804 self._add_server_with_share(server_number=3, share_number=1)
1806 self._copy_share_to_server(3, 1)
1807 storedir = self.get_serverdir(0)
1808 # remove the storedir, wiping out any existing shares
1809 shutil.rmtree(storedir)
1810 # create an empty storedir to replace the one we just removed
1812 client = self.g.clients[0]
1813 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1816 d.addCallback(_setup)
1817 d.addCallback(lambda client:
1818 client.upload(upload.Data("data" * 10000, convergence="")))
1821 def test_upload_succeeds_with_some_homeless_shares(self):
1822 # If the upload is forced to stop trying to place shares before
1823 # it has placed (or otherwise accounted) for all of them, but it
1824 # has placed enough to satisfy the upload health criteria that
1825 # we're using, it should still succeed.
1826 self.basedir = self.mktemp()
1827 d = self._setup_and_upload()
1828 def _server_setup(ign):
1829 # Add four servers so that we have a layout like this:
1830 # server 1: share 0, read-only
1831 # server 2: share 1, read-only
1832 # server 3: share 2, read-only
1833 # server 4: share 3, read-only
1834 # If we set happy = 4, the upload will manage to satisfy
1835 # servers of happiness, but not place all of the shares; we
1836 # want to test that the upload is declared successful in
1838 self._add_server_with_share(server_number=1, share_number=0,
1840 self._add_server_with_share(server_number=2, share_number=1,
1842 self._add_server_with_share(server_number=3, share_number=2,
1844 self._add_server_with_share(server_number=4, share_number=3,
1847 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1848 # Set the client appropriately
1849 c = self.g.clients[0]
1850 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1852 d.addCallback(_server_setup)
1853 d.addCallback(lambda client:
1854 client.upload(upload.Data("data" * 10000, convergence="")))
1858 def test_uploader_skips_over_servers_with_only_one_share(self):
1859 # We want to make sure that the redistribution logic ignores
1860 # servers with only one share, since placing these shares
1861 # elsewhere will at best keep happiness the same as it was, and
1863 self.basedir = self.mktemp()
1864 d = self._setup_and_upload()
1865 def _server_setup(ign):
1866 # Add some servers so that the upload will need to
1867 # redistribute, but will first pass over a couple of servers
1868 # that don't have enough shares to redistribute before
1869 # finding one that does have shares to redistribute.
1870 self._add_server_with_share(server_number=1, share_number=0)
1871 self._add_server_with_share(server_number=2, share_number=2)
1872 self._add_server_with_share(server_number=3, share_number=1)
1873 self._add_server_with_share(server_number=8, share_number=4)
1874 self._add_server_with_share(server_number=5, share_number=5)
1875 self._add_server_with_share(server_number=10, share_number=7)
1877 self._copy_share_to_server(i, 2)
1878 return self.g.clients[0]
1879 d.addCallback(_server_setup)
1880 d.addCallback(lambda client:
1881 client.upload(upload.Data("data" * 10000, convergence="")))
1885 def test_peer_selector_bucket_abort(self):
1886 # If peer selection for an upload fails due to an unhappy
1887 # layout, the peer selection process should abort the buckets it
1888 # allocates before failing, so that the space can be re-used.
1889 self.basedir = self.mktemp()
1890 self.set_up_grid(num_servers=5)
1892 # Try to upload a file with happy=7, which is unsatisfiable with
1893 # the current grid. This will fail, but should not take up any
1894 # space on the storage servers after it fails.
1895 client = self.g.clients[0]
1896 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
1897 d = defer.succeed(None)
1898 d.addCallback(lambda ignored:
1899 self.shouldFail(UploadUnhappinessError,
1900 "test_peer_selection_bucket_abort",
1902 client.upload, upload.Data("data" * 10000,
1904 # wait for the abort messages to get there.
1905 def _turn_barrier(res):
1906 return fireEventually(res)
1907 d.addCallback(_turn_barrier)
1909 for server in self.g.servers_by_number.values():
1910 self.failUnlessEqual(server.allocated_size(), 0)
1911 d.addCallback(_then)
1915 def test_encoder_bucket_abort(self):
1916 # If enough servers die in the process of encoding and uploading
1917 # a file to make the layout unhappy, we should cancel the
1918 # newly-allocated buckets before dying.
1919 self.basedir = self.mktemp()
1920 self.set_up_grid(num_servers=4)
1922 client = self.g.clients[0]
1923 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
1925 d = defer.succeed(None)
1926 d.addCallback(lambda ignored:
1927 self.shouldFail(UploadUnhappinessError,
1928 "test_encoder_bucket_abort",
1930 self._do_upload_with_broken_servers, 1))
1931 def _turn_barrier(res):
1932 return fireEventually(res)
1933 d.addCallback(_turn_barrier)
1935 for server in self.g.servers_by_number.values():
1936 self.failUnlessEqual(server.allocated_size(), 0)
1937 d.addCallback(_then)
1941 def _set_up_nodes_extra_config(self, clientdir):
1942 cfgfn = os.path.join(clientdir, "tahoe.cfg")
1943 oldcfg = open(cfgfn, "r").read()
1944 f = open(cfgfn, "wt")
1947 f.write("[client]\n")
1948 f.write("shares.needed = 7\n")
1949 f.write("shares.total = 12\n")
1955 # upload with exactly 75 peers (shares_of_happiness)
1956 # have a download fail
1957 # cancel a download (need to implement more cancel stuff)