2 from cStringIO import StringIO
3 from twisted.trial import unittest
4 from twisted.python.failure import Failure
5 from twisted.python import log
6 from twisted.internet import defer
7 from foolscap.api import fireEventually
9 import allmydata # for __full_version__
10 from allmydata import uri, monitor, client
11 from allmydata.immutable import upload, encode
12 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
13 from allmydata.util.assertutil import precondition
14 from allmydata.util.deferredutil import DeferredListShouldSucceed
15 from allmydata.test.no_network import GridTestMixin
16 from allmydata.test.common_util import ShouldFailMixin
17 from allmydata.util.happinessutil import servers_of_happiness, \
18 shares_by_server, merge_peers
19 from allmydata.storage_client import StorageFarmBroker
20 from allmydata.storage.server import storage_index_to_dir
24 def extract_uri(results):
27 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
28 # have been due to an earlier test ERROR'ing out due to timeout, which seems
29 # to screw up subsequent tests.
32 class Uploadable(unittest.TestCase):
33 def shouldEqual(self, data, expected):
34 self.failUnless(isinstance(data, list))
36 self.failUnless(isinstance(e, str))
38 self.failUnlessEqual(s, expected)
40 def test_filehandle_random_key(self):
41 return self._test_filehandle(convergence=None)
43 def test_filehandle_convergent_encryption(self):
44 return self._test_filehandle(convergence="some convergence string")
46 def _test_filehandle(self, convergence):
48 u = upload.FileHandle(s, convergence=convergence)
50 d.addCallback(self.failUnlessEqual, 41)
51 d.addCallback(lambda res: u.read(1))
52 d.addCallback(self.shouldEqual, "a")
53 d.addCallback(lambda res: u.read(80))
54 d.addCallback(self.shouldEqual, "a"*40)
55 d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
56 d.addCallback(lambda res: s.close()) # that privilege is reserved for us
59 def test_filename(self):
60 basedir = "upload/Uploadable/test_filename"
62 fn = os.path.join(basedir, "file")
66 u = upload.FileName(fn, convergence=None)
68 d.addCallback(self.failUnlessEqual, 41)
69 d.addCallback(lambda res: u.read(1))
70 d.addCallback(self.shouldEqual, "a")
71 d.addCallback(lambda res: u.read(80))
72 d.addCallback(self.shouldEqual, "a"*40)
73 d.addCallback(lambda res: u.close())
78 u = upload.Data(s, convergence=None)
80 d.addCallback(self.failUnlessEqual, 41)
81 d.addCallback(lambda res: u.read(1))
82 d.addCallback(self.shouldEqual, "a")
83 d.addCallback(lambda res: u.read(80))
84 d.addCallback(self.shouldEqual, "a"*40)
85 d.addCallback(lambda res: u.close())
88 class ServerError(Exception):
92 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
96 "max_segment_size": max_segsize,
98 self.node.DEFAULT_ENCODING_PARAMETERS = p
100 class FakeStorageServer:
101 def __init__(self, mode):
105 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
106 { "maximum-immutable-share-size": 2**32 },
107 "application-version": str(allmydata.__full_version__),
110 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
111 { "maximum-immutable-share-size": 10 },
112 "application-version": str(allmydata.__full_version__),
116 def callRemote(self, methname, *args, **kwargs):
118 meth = getattr(self, methname)
119 return meth(*args, **kwargs)
121 d.addCallback(lambda res: _call())
124 def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
125 sharenums, share_size, canary):
126 #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
127 if self.mode == "first-fail":
128 if self.queries == 0:
130 if self.mode == "second-fail":
131 if self.queries == 1:
134 if self.mode == "full":
136 elif self.mode == "already got them":
137 return (set(sharenums), {},)
139 for shnum in sharenums:
140 self.allocated.append( (storage_index, shnum) )
142 dict([( shnum, FakeBucketWriter(share_size) )
143 for shnum in sharenums]),
146 class FakeBucketWriter:
147 # a diagnostic version of storageserver.BucketWriter
148 def __init__(self, size):
149 self.data = StringIO()
153 def callRemote(self, methname, *args, **kwargs):
155 meth = getattr(self, "remote_" + methname)
156 return meth(*args, **kwargs)
158 d.addCallback(lambda res: _call())
162 def callRemoteOnly(self, methname, *args, **kwargs):
163 d = self.callRemote(methname, *args, **kwargs)
164 del d # callRemoteOnly ignores this
168 def remote_write(self, offset, data):
169 precondition(not self.closed)
170 precondition(offset >= 0)
171 precondition(offset+len(data) <= self._size,
172 "offset=%d + data=%d > size=%d" %
173 (offset, len(data), self._size))
174 self.data.seek(offset)
175 self.data.write(data)
177 def remote_close(self):
178 precondition(not self.closed)
181 def remote_abort(self):
185 DEFAULT_ENCODING_PARAMETERS = {"k":25,
188 "max_segment_size": 1*MiB,
190 def __init__(self, mode="good", num_servers=50):
191 self.num_servers = num_servers
192 if type(mode) is str:
193 mode = dict([i,mode] for i in range(num_servers))
194 peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
195 for fakeid in range(self.num_servers) ]
196 self.storage_broker = StorageFarmBroker(None, permute_peers=True)
197 for (serverid, server) in peers:
198 self.storage_broker.test_add_server(serverid, server)
199 self.last_peers = [p[1] for p in peers]
201 def log(self, *args, **kwargs):
203 def get_encoding_parameters(self):
204 return self.DEFAULT_ENCODING_PARAMETERS
205 def get_storage_broker(self):
206 return self.storage_broker
207 _secret_holder = client.SecretHolder("lease secret", "convergence secret")
209 class GotTooFarError(Exception):
212 class GiganticUploadable(upload.FileHandle):
213 def __init__(self, size):
217 def get_encryption_key(self):
218 return defer.succeed("\x00" * 16)
220 return defer.succeed(self._size)
221 def read(self, length):
222 left = self._size - self._fp
223 length = min(left, length)
225 if self._fp > 1000000:
226 # terminate the test early.
227 raise GotTooFarError("we shouldn't be allowed to get this far")
228 return defer.succeed(["\x00" * length])
233 Once upon a time, there was a beautiful princess named Buttercup. She lived
234 in a magical land where every file was stored securely among millions of
235 machines, and nobody ever worried about their data being lost ever again.
238 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
242 SIZE_LARGE = len(DATA)
244 def upload_data(uploader, data):
245 u = upload.Data(data, convergence=None)
246 return uploader.upload(u)
247 def upload_filename(uploader, filename):
248 u = upload.FileName(filename, convergence=None)
249 return uploader.upload(u)
250 def upload_filehandle(uploader, fh):
251 u = upload.FileHandle(fh, convergence=None)
252 return uploader.upload(u)
254 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
256 self.node = FakeClient(mode="good")
257 self.u = upload.Uploader()
258 self.u.running = True
259 self.u.parent = self.node
261 def _check_small(self, newuri, size):
262 u = uri.from_string(newuri)
263 self.failUnless(isinstance(u, uri.LiteralFileURI))
264 self.failUnlessEqual(len(u.data), size)
266 def _check_large(self, newuri, size):
267 u = uri.from_string(newuri)
268 self.failUnless(isinstance(u, uri.CHKFileURI))
269 self.failUnless(isinstance(u.get_storage_index(), str))
270 self.failUnlessEqual(len(u.get_storage_index()), 16)
271 self.failUnless(isinstance(u.key, str))
272 self.failUnlessEqual(len(u.key), 16)
273 self.failUnlessEqual(u.size, size)
275 def get_data(self, size):
278 def test_too_large(self):
279 # we've removed the 4GiB share size limit (see ticket #346 for
280 # details), but still have an 8-byte field, so the limit is now
281 # 2**64, so make sure we reject files larger than that.
282 k = 3; happy = 7; n = 10
283 self.set_encoding_parameters(k, happy, n)
285 data1 = GiganticUploadable(big)
286 d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
287 "This file is too large to be uploaded (data_size)",
288 self.u.upload, data1)
289 data2 = GiganticUploadable(big-3)
290 d.addCallback(lambda res:
291 self.shouldFail(FileTooLargeError,
292 "test_too_large-data2",
293 "This file is too large to be uploaded (offsets)",
294 self.u.upload, data2))
295 # I don't know where the actual limit is.. it depends upon how large
296 # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
299 def test_data_zero(self):
300 data = self.get_data(SIZE_ZERO)
301 d = upload_data(self.u, data)
302 d.addCallback(extract_uri)
303 d.addCallback(self._check_small, SIZE_ZERO)
306 def test_data_small(self):
307 data = self.get_data(SIZE_SMALL)
308 d = upload_data(self.u, data)
309 d.addCallback(extract_uri)
310 d.addCallback(self._check_small, SIZE_SMALL)
313 def test_data_large(self):
314 data = self.get_data(SIZE_LARGE)
315 d = upload_data(self.u, data)
316 d.addCallback(extract_uri)
317 d.addCallback(self._check_large, SIZE_LARGE)
320 def test_data_large_odd_segments(self):
321 data = self.get_data(SIZE_LARGE)
322 segsize = int(SIZE_LARGE / 2.5)
323 # we want 3 segments, since that's not a power of two
324 self.set_encoding_parameters(25, 25, 100, segsize)
325 d = upload_data(self.u, data)
326 d.addCallback(extract_uri)
327 d.addCallback(self._check_large, SIZE_LARGE)
330 def test_filehandle_zero(self):
331 data = self.get_data(SIZE_ZERO)
332 d = upload_filehandle(self.u, StringIO(data))
333 d.addCallback(extract_uri)
334 d.addCallback(self._check_small, SIZE_ZERO)
337 def test_filehandle_small(self):
338 data = self.get_data(SIZE_SMALL)
339 d = upload_filehandle(self.u, StringIO(data))
340 d.addCallback(extract_uri)
341 d.addCallback(self._check_small, SIZE_SMALL)
344 def test_filehandle_large(self):
345 data = self.get_data(SIZE_LARGE)
346 d = upload_filehandle(self.u, StringIO(data))
347 d.addCallback(extract_uri)
348 d.addCallback(self._check_large, SIZE_LARGE)
351 def test_filename_zero(self):
352 fn = "Uploader-test_filename_zero.data"
354 data = self.get_data(SIZE_ZERO)
357 d = upload_filename(self.u, fn)
358 d.addCallback(extract_uri)
359 d.addCallback(self._check_small, SIZE_ZERO)
362 def test_filename_small(self):
363 fn = "Uploader-test_filename_small.data"
365 data = self.get_data(SIZE_SMALL)
368 d = upload_filename(self.u, fn)
369 d.addCallback(extract_uri)
370 d.addCallback(self._check_small, SIZE_SMALL)
373 def test_filename_large(self):
374 fn = "Uploader-test_filename_large.data"
376 data = self.get_data(SIZE_LARGE)
379 d = upload_filename(self.u, fn)
380 d.addCallback(extract_uri)
381 d.addCallback(self._check_large, SIZE_LARGE)
384 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
385 def make_node(self, mode, num_servers=10):
386 self.node = FakeClient(mode, num_servers)
387 self.u = upload.Uploader()
388 self.u.running = True
389 self.u.parent = self.node
391 def _check_large(self, newuri, size):
392 u = uri.from_string(newuri)
393 self.failUnless(isinstance(u, uri.CHKFileURI))
394 self.failUnless(isinstance(u.get_storage_index(), str))
395 self.failUnlessEqual(len(u.get_storage_index()), 16)
396 self.failUnless(isinstance(u.key, str))
397 self.failUnlessEqual(len(u.key), 16)
398 self.failUnlessEqual(u.size, size)
400 def test_first_error(self):
401 mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
403 self.set_encoding_parameters(k=25, happy=1, n=50)
404 d = upload_data(self.u, DATA)
405 d.addCallback(extract_uri)
406 d.addCallback(self._check_large, SIZE_LARGE)
409 def test_first_error_all(self):
410 self.make_node("first-fail")
411 d = self.shouldFail(UploadUnhappinessError, "first_error_all",
412 "peer selection failed",
413 upload_data, self.u, DATA)
415 self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
416 # there should also be a 'last failure was' message
417 self.failUnlessIn("ServerError", str(f.value))
418 d.addCallback(_check)
421 def test_second_error(self):
422 # we want to make sure we make it to a third pass. This means that
423 # the first pass was insufficient to place all shares, and at least
424 # one of second pass servers (other than the last one) accepted a
425 # share (so we'll believe that a third pass will be useful). (if
426 # everyone but the last server throws an error, then we'll send all
427 # the remaining shares to the last server at the end of the second
428 # pass, and if that succeeds, we won't make it to a third pass).
430 # we can achieve this 97.5% of the time by using 40 servers, having
431 # 39 of them fail on the second request, leaving only one to succeed
432 # on the second request. (we need to keep the number of servers low
433 # enough to ensure a second pass with 100 shares).
434 mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
435 self.make_node(mode, 40)
436 d = upload_data(self.u, DATA)
437 d.addCallback(extract_uri)
438 d.addCallback(self._check_large, SIZE_LARGE)
441 def test_second_error_all(self):
442 self.make_node("second-fail")
443 d = self.shouldFail(UploadUnhappinessError, "second_error_all",
444 "peer selection failed",
445 upload_data, self.u, DATA)
447 self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
448 # there should also be a 'last failure was' message
449 self.failUnlessIn("ServerError", str(f.value))
450 d.addCallback(_check)
453 class FullServer(unittest.TestCase):
455 self.node = FakeClient(mode="full")
456 self.u = upload.Uploader()
457 self.u.running = True
458 self.u.parent = self.node
460 def _should_fail(self, f):
461 self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
463 def test_data_large(self):
465 d = upload_data(self.u, data)
466 d.addBoth(self._should_fail)
469 class PeerSelection(unittest.TestCase):
471 def make_client(self, num_servers=50):
472 self.node = FakeClient(mode="good", num_servers=num_servers)
473 self.u = upload.Uploader()
474 self.u.running = True
475 self.u.parent = self.node
477 def get_data(self, size):
480 def _check_large(self, newuri, size):
481 u = uri.from_string(newuri)
482 self.failUnless(isinstance(u, uri.CHKFileURI))
483 self.failUnless(isinstance(u.get_storage_index(), str))
484 self.failUnlessEqual(len(u.get_storage_index()), 16)
485 self.failUnless(isinstance(u.key, str))
486 self.failUnlessEqual(len(u.key), 16)
487 self.failUnlessEqual(u.size, size)
489 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
493 "max_segment_size": max_segsize,
495 self.node.DEFAULT_ENCODING_PARAMETERS = p
497 def test_one_each(self):
498 # if we have 50 shares, and there are 50 peers, and they all accept a
499 # share, we should get exactly one share per peer
502 data = self.get_data(SIZE_LARGE)
503 self.set_encoding_parameters(25, 30, 50)
504 d = upload_data(self.u, data)
505 d.addCallback(extract_uri)
506 d.addCallback(self._check_large, SIZE_LARGE)
508 for p in self.node.last_peers:
509 allocated = p.allocated
510 self.failUnlessEqual(len(allocated), 1)
511 self.failUnlessEqual(p.queries, 1)
512 d.addCallback(_check)
515 def test_two_each(self):
516 # if we have 100 shares, and there are 50 peers, and they all accept
517 # all shares, we should get exactly two shares per peer
520 data = self.get_data(SIZE_LARGE)
521 # if there are 50 peers, then happy needs to be <= 50
522 self.set_encoding_parameters(50, 50, 100)
523 d = upload_data(self.u, data)
524 d.addCallback(extract_uri)
525 d.addCallback(self._check_large, SIZE_LARGE)
527 for p in self.node.last_peers:
528 allocated = p.allocated
529 self.failUnlessEqual(len(allocated), 2)
530 self.failUnlessEqual(p.queries, 2)
531 d.addCallback(_check)
534 def test_one_each_plus_one_extra(self):
535 # if we have 51 shares, and there are 50 peers, then one peer gets
536 # two shares and the rest get just one
539 data = self.get_data(SIZE_LARGE)
540 self.set_encoding_parameters(24, 41, 51)
541 d = upload_data(self.u, data)
542 d.addCallback(extract_uri)
543 d.addCallback(self._check_large, SIZE_LARGE)
547 for p in self.node.last_peers:
548 allocated = p.allocated
549 self.failUnless(len(allocated) in (1,2), len(allocated))
550 if len(allocated) == 1:
551 self.failUnlessEqual(p.queries, 1)
554 self.failUnlessEqual(p.queries, 2)
556 self.failUnlessEqual(len(got_one), 49)
557 self.failUnlessEqual(len(got_two), 1)
558 d.addCallback(_check)
561 def test_four_each(self):
562 # if we have 200 shares, and there are 50 peers, then each peer gets
563 # 4 shares. The design goal is to accomplish this with only two
567 data = self.get_data(SIZE_LARGE)
568 # if there are 50 peers, then happy should be no more than 50 if
569 # we want this to work.
570 self.set_encoding_parameters(100, 50, 200)
571 d = upload_data(self.u, data)
572 d.addCallback(extract_uri)
573 d.addCallback(self._check_large, SIZE_LARGE)
575 for p in self.node.last_peers:
576 allocated = p.allocated
577 self.failUnlessEqual(len(allocated), 4)
578 self.failUnlessEqual(p.queries, 2)
579 d.addCallback(_check)
582 def test_three_of_ten(self):
583 # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
587 data = self.get_data(SIZE_LARGE)
588 self.set_encoding_parameters(3, 3, 10)
589 d = upload_data(self.u, data)
590 d.addCallback(extract_uri)
591 d.addCallback(self._check_large, SIZE_LARGE)
594 for p in self.node.last_peers:
595 allocated = p.allocated
596 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
597 histogram = [counts.get(i, 0) for i in range(5)]
598 self.failUnlessEqual(histogram, [0,0,0,2,1])
599 d.addCallback(_check)
602 def test_some_big_some_small(self):
603 # 10 shares, 20 servers, but half the servers don't support a
604 # share-size large enough for our file
605 mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
606 self.node = FakeClient(mode, num_servers=20)
607 self.u = upload.Uploader()
608 self.u.running = True
609 self.u.parent = self.node
611 data = self.get_data(SIZE_LARGE)
612 self.set_encoding_parameters(3, 5, 10)
613 d = upload_data(self.u, data)
614 d.addCallback(extract_uri)
615 d.addCallback(self._check_large, SIZE_LARGE)
617 # we should have put one share each on the big peers, and zero
618 # shares on the small peers
620 for p in self.node.last_peers:
622 self.failUnlessEqual(len(p.allocated), 1)
623 elif p.mode == "small":
624 self.failUnlessEqual(len(p.allocated), 0)
625 total_allocated += len(p.allocated)
626 self.failUnlessEqual(total_allocated, 10)
627 d.addCallback(_check)
631 class StorageIndex(unittest.TestCase):
632 def test_params_must_matter(self):
633 DATA = "I am some data"
634 u = upload.Data(DATA, convergence="")
635 eu = upload.EncryptAnUploadable(u)
636 d1 = eu.get_storage_index()
638 # CHK means the same data should encrypt the same way
639 u = upload.Data(DATA, convergence="")
640 eu = upload.EncryptAnUploadable(u)
641 d1a = eu.get_storage_index()
643 # but if we use a different convergence string it should be different
644 u = upload.Data(DATA, convergence="wheee!")
645 eu = upload.EncryptAnUploadable(u)
646 d1salt1 = eu.get_storage_index()
648 # and if we add yet a different convergence it should be different again
649 u = upload.Data(DATA, convergence="NOT wheee!")
650 eu = upload.EncryptAnUploadable(u)
651 d1salt2 = eu.get_storage_index()
653 # and if we use the first string again it should be the same as last time
654 u = upload.Data(DATA, convergence="wheee!")
655 eu = upload.EncryptAnUploadable(u)
656 d1salt1a = eu.get_storage_index()
658 # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
659 u = upload.Data(DATA, convergence="")
660 u.encoding_param_k = u.default_encoding_param_k + 1
661 eu = upload.EncryptAnUploadable(u)
662 d2 = eu.get_storage_index()
664 # and if we use a random key, it should be different than the CHK
665 u = upload.Data(DATA, convergence=None)
666 eu = upload.EncryptAnUploadable(u)
667 d3 = eu.get_storage_index()
668 # and different from another instance
669 u = upload.Data(DATA, convergence=None)
670 eu = upload.EncryptAnUploadable(u)
671 d4 = eu.get_storage_index()
673 d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
675 si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
676 self.failUnlessEqual(si1, si1a)
677 self.failIfEqual(si1, si2)
678 self.failIfEqual(si1, si3)
679 self.failIfEqual(si1, si4)
680 self.failIfEqual(si3, si4)
681 self.failIfEqual(si1salt1, si1)
682 self.failIfEqual(si1salt1, si1salt2)
683 self.failIfEqual(si1salt2, si1)
684 self.failUnlessEqual(si1salt1, si1salt1a)
688 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
690 def _do_upload_with_broken_servers(self, servers_to_break):
692 I act like a normal upload, but before I send the results of
693 Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
694 PeerTrackers in the used_peers part of the return result.
696 assert self.g, "I tried to find a grid at self.g, but failed"
697 broker = self.g.clients[0].storage_broker
698 sh = self.g.clients[0]._secret_holder
699 data = upload.Data("data" * 10000, convergence="")
700 data.encoding_param_k = 3
701 data.encoding_param_happy = 4
702 data.encoding_param_n = 10
703 uploadable = upload.EncryptAnUploadable(data)
704 encoder = encode.Encoder()
705 encoder.set_encrypted_uploadable(uploadable)
706 status = upload.UploadStatus()
707 selector = upload.Tahoe2PeerSelector("dglev", "test", status)
708 storage_index = encoder.get_param("storage_index")
709 share_size = encoder.get_param("share_size")
710 block_size = encoder.get_param("block_size")
711 num_segments = encoder.get_param("num_segments")
712 d = selector.get_shareholders(broker, sh, storage_index,
713 share_size, block_size, num_segments,
715 def _have_shareholders((used_peers, already_peers)):
716 assert servers_to_break <= len(used_peers)
717 for index in xrange(servers_to_break):
718 server = list(used_peers)[index]
719 for share in server.buckets.keys():
720 server.buckets[share].abort()
722 servermap = already_peers.copy()
723 for peer in used_peers:
724 buckets.update(peer.buckets)
725 for bucket in peer.buckets:
726 servermap.setdefault(bucket, set()).add(peer.peerid)
727 encoder.set_shareholders(buckets, servermap)
730 d.addCallback(_have_shareholders)
734 def _add_server(self, server_number, readonly=False):
735 assert self.g, "I tried to find a grid at self.g, but failed"
736 ss = self.g.make_server(server_number, readonly)
737 self.g.add_server(server_number, ss)
740 def _add_server_with_share(self, server_number, share_number=None,
742 self._add_server(server_number, readonly)
743 if share_number is not None:
744 self._copy_share_to_server(share_number, server_number)
747 def _copy_share_to_server(self, share_number, server_number):
748 ss = self.g.servers_by_number[server_number]
749 # Copy share i from the directory associated with the first
750 # storage server to the directory associated with this one.
751 assert self.g, "I tried to find a grid at self.g, but failed"
752 assert self.shares, "I tried to find shares at self.shares, but failed"
753 old_share_location = self.shares[share_number][2]
754 new_share_location = os.path.join(ss.storedir, "shares")
755 si = uri.from_string(self.uri).get_storage_index()
756 new_share_location = os.path.join(new_share_location,
757 storage_index_to_dir(si))
758 if not os.path.exists(new_share_location):
759 os.makedirs(new_share_location)
760 new_share_location = os.path.join(new_share_location,
762 if old_share_location != new_share_location:
763 shutil.copy(old_share_location, new_share_location)
764 shares = self.find_shares(self.uri)
765 # Make sure that the storage server has the share.
766 self.failUnless((share_number, ss.my_nodeid, new_share_location)
769 def _setup_grid(self):
771 I set up a NoNetworkGrid with a single server and client.
773 self.set_up_grid(num_clients=1, num_servers=1)
775 def _setup_and_upload(self, **kwargs):
777 I set up a NoNetworkGrid with a single server and client,
778 upload a file to it, store its uri in self.uri, and store its
779 sharedata in self.shares.
782 client = self.g.clients[0]
783 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
784 if "n" in kwargs and "k" in kwargs:
785 client.DEFAULT_ENCODING_PARAMETERS['k'] = kwargs['k']
786 client.DEFAULT_ENCODING_PARAMETERS['n'] = kwargs['n']
787 data = upload.Data("data" * 10000, convergence="")
789 d = client.upload(data)
792 d.addCallback(_store_uri)
793 d.addCallback(lambda ign:
794 self.find_shares(self.uri))
795 def _store_shares(shares):
797 d.addCallback(_store_shares)
801 def test_configure_parameters(self):
802 self.basedir = self.mktemp()
803 hooks = {0: self._set_up_nodes_extra_config}
804 self.set_up_grid(client_config_hooks=hooks)
805 c0 = self.g.clients[0]
808 u = upload.Data(DATA, convergence="")
810 d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
811 m = monitor.Monitor()
812 d.addCallback(lambda fn: fn.check(m))
815 self.failUnlessEqual(data["count-shares-needed"], 7)
816 self.failUnlessEqual(data["count-shares-expected"], 12)
817 d.addCallback(_check)
821 def _setUp(self, ns):
822 # Used by test_happy_semantics and test_preexisting_share_behavior
823 # to set up the grid.
824 self.node = FakeClient(mode="good", num_servers=ns)
825 self.u = upload.Uploader()
826 self.u.running = True
827 self.u.parent = self.node
830 def test_happy_semantics(self):
832 DATA = upload.Data("kittens" * 10000, convergence="")
833 # These parameters are unsatisfiable with only 2 servers.
834 self.set_encoding_parameters(k=3, happy=5, n=10)
835 d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
836 "shares could be placed or found on only 2 "
837 "server(s). We were asked to place shares on "
838 "at least 5 server(s) such that any 3 of them "
839 "have enough shares to recover the file",
841 # Let's reset the client to have 10 servers
842 d.addCallback(lambda ign:
844 # These parameters are satisfiable with 10 servers.
845 d.addCallback(lambda ign:
846 self.set_encoding_parameters(k=3, happy=5, n=10))
847 d.addCallback(lambda ign:
849 # Let's reset the client to have 7 servers
850 # (this is less than n, but more than h)
851 d.addCallback(lambda ign:
853 # These parameters are satisfiable with 7 servers.
854 d.addCallback(lambda ign:
855 self.set_encoding_parameters(k=3, happy=5, n=10))
856 d.addCallback(lambda ign:
860 def test_aborted_shares(self):
861 self.basedir = "upload/EncodingParameters/aborted_shares"
862 self.set_up_grid(num_servers=4)
863 c = self.g.clients[0]
864 DATA = upload.Data(100* "kittens", convergence="")
865 # These parameters are unsatisfiable with only 4 servers, but should
866 # work with 5, as long as the original 4 are not stuck in the open
867 # BucketWriter state (open() but not
868 parms = {"k":2, "happy":5, "n":5, "max_segment_size": 1*MiB}
869 c.DEFAULT_ENCODING_PARAMETERS = parms
870 d = self.shouldFail(UploadUnhappinessError, "test_aborted_shares",
871 "shares could be placed on only 4 "
872 "server(s) such that any 2 of them have enough "
873 "shares to recover the file, but we were asked "
874 "to place shares on at least 5 such servers",
876 # now add the 5th server
877 d.addCallback(lambda ign: self._add_server(4, False))
878 # and this time the upload ought to succeed
879 d.addCallback(lambda ign: c.upload(DATA))
883 def test_problem_layout_comment_52(self):
885 self.basedir = self.mktemp()
887 # This scenario is at
888 # http://allmydata.org/trac/tahoe/ticket/778#comment:52
890 # The scenario in comment:52 proposes that we have a layout
892 # server 0: shares 1 - 9
893 # server 1: share 0, read-only
894 # server 2: share 0, read-only
895 # server 3: share 0, read-only
896 # To get access to the shares, we will first upload to one
897 # server, which will then have shares 0 - 9. We'll then
898 # add three new servers, configure them to not accept any new
899 # shares, then write share 0 directly into the serverdir of each,
900 # and then remove share 0 from server 0 in the same way.
901 # Then each of servers 1 - 3 will report that they have share 0,
902 # and will not accept any new share, while server 0 will report that
903 # it has shares 1 - 9 and will accept new shares.
904 # We'll then set 'happy' = 4, and see that an upload fails
906 d = self._setup_and_upload()
907 d.addCallback(lambda ign:
908 self._add_server_with_share(server_number=1, share_number=0,
910 d.addCallback(lambda ign:
911 self._add_server_with_share(server_number=2, share_number=0,
913 d.addCallback(lambda ign:
914 self._add_server_with_share(server_number=3, share_number=0,
916 # Remove the first share from server 0.
917 def _remove_share_0_from_server_0():
918 share_location = self.shares[0][2]
919 os.remove(share_location)
920 d.addCallback(lambda ign:
921 _remove_share_0_from_server_0())
922 # Set happy = 4 in the client.
924 client = self.g.clients[0]
925 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
927 d.addCallback(lambda ign:
929 # Uploading data should fail
930 d.addCallback(lambda client:
931 self.shouldFail(UploadUnhappinessError,
932 "test_problem_layout_comment_52_test_1",
933 "shares could be placed or found on 4 server(s), "
934 "but they are not spread out evenly enough to "
935 "ensure that any 3 of these servers would have "
936 "enough shares to recover the file. "
937 "We were asked to place shares on at "
938 "least 4 servers such that any 3 of them have "
939 "enough shares to recover the file",
940 client.upload, upload.Data("data" * 10000,
943 # Do comment:52, but like this:
945 # server 3: share 0, read-only
946 # server 1: share 0, read-only
947 # server 0: shares 0-9
948 d.addCallback(lambda ign:
950 d.addCallback(lambda ign:
951 self._setup_and_upload())
952 d.addCallback(lambda ign:
953 self._add_server(server_number=2))
954 d.addCallback(lambda ign:
955 self._add_server_with_share(server_number=3, share_number=0,
957 d.addCallback(lambda ign:
958 self._add_server_with_share(server_number=1, share_number=0,
961 client = self.g.clients[0]
962 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
964 d.addCallback(lambda ign:
966 d.addCallback(lambda client:
967 self.shouldFail(UploadUnhappinessError,
968 "test_problem_layout_comment_52_test_2",
969 "shares could be placed on only 3 server(s) such "
970 "that any 3 of them have enough shares to recover "
971 "the file, but we were asked to place shares on "
972 "at least 4 such servers.",
973 client.upload, upload.Data("data" * 10000,
978 def test_problem_layout_comment_53(self):
979 # This scenario is at
980 # http://allmydata.org/trac/tahoe/ticket/778#comment:53
982 # Set up the grid to have one server
983 def _change_basedir(ign):
984 self.basedir = self.mktemp()
985 _change_basedir(None)
986 # We start by uploading all of the shares to one server.
987 # Next, we'll add three new servers to our NoNetworkGrid. We'll add
988 # one share from our initial upload to each of these.
989 # The counterintuitive ordering of the share numbers is to deal with
990 # the permuting of these servers -- distributing the shares this
991 # way ensures that the Tahoe2PeerSelector sees them in the order
993 d = self._setup_and_upload()
994 d.addCallback(lambda ign:
995 self._add_server_with_share(server_number=1, share_number=2))
996 d.addCallback(lambda ign:
997 self._add_server_with_share(server_number=2, share_number=0))
998 d.addCallback(lambda ign:
999 self._add_server_with_share(server_number=3, share_number=1))
1000 # So, we now have the following layout:
1001 # server 0: shares 0 - 9
1005 # We change the 'happy' parameter in the client to 4.
1006 # The Tahoe2PeerSelector will see the peers permuted as:
1008 # Ideally, a reupload of our original data should work.
1009 def _reset_encoding_parameters(ign, happy=4):
1010 client = self.g.clients[0]
1011 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1013 d.addCallback(_reset_encoding_parameters)
1014 d.addCallback(lambda client:
1015 client.upload(upload.Data("data" * 10000, convergence="")))
1018 # This scenario is basically comment:53, but changed so that the
1019 # Tahoe2PeerSelector sees the server with all of the shares before
1020 # any of the other servers.
1022 # server 2: shares 0 - 9
1026 # The Tahoe2PeerSelector sees the peers permuted as:
1028 # Note that server 0 has been replaced by server 4; this makes it
1029 # easier to ensure that the last server seen by Tahoe2PeerSelector
1030 # has only one share.
1031 d.addCallback(_change_basedir)
1032 d.addCallback(lambda ign:
1033 self._setup_and_upload())
1034 d.addCallback(lambda ign:
1035 self._add_server_with_share(server_number=2, share_number=0))
1036 d.addCallback(lambda ign:
1037 self._add_server_with_share(server_number=3, share_number=1))
1038 d.addCallback(lambda ign:
1039 self._add_server_with_share(server_number=1, share_number=2))
1040 # Copy all of the other shares to server number 2
1041 def _copy_shares(ign):
1042 for i in xrange(0, 10):
1043 self._copy_share_to_server(i, 2)
1044 d.addCallback(_copy_shares)
1045 # Remove the first server, and add a placeholder with share 0
1046 d.addCallback(lambda ign:
1047 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1048 d.addCallback(lambda ign:
1049 self._add_server_with_share(server_number=4, share_number=0))
1050 # Now try uploading.
1051 d.addCallback(_reset_encoding_parameters)
1052 d.addCallback(lambda client:
1053 client.upload(upload.Data("data" * 10000, convergence="")))
1056 # Try the same thing, but with empty servers after the first one
1057 # We want to make sure that Tahoe2PeerSelector will redistribute
1058 # shares as necessary, not simply discover an existing layout.
1060 # server 2: shares 0 - 9
1064 d.addCallback(_change_basedir)
1065 d.addCallback(lambda ign:
1066 self._setup_and_upload())
1067 d.addCallback(lambda ign:
1068 self._add_server(server_number=2))
1069 d.addCallback(lambda ign:
1070 self._add_server(server_number=3))
1071 d.addCallback(lambda ign:
1072 self._add_server(server_number=1))
1073 d.addCallback(lambda ign:
1074 self._add_server(server_number=4))
1075 d.addCallback(_copy_shares)
1076 d.addCallback(lambda ign:
1077 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1078 d.addCallback(_reset_encoding_parameters)
1079 d.addCallback(lambda client:
1080 client.upload(upload.Data("data" * 10000, convergence="")))
1081 # Make sure that only as many shares as necessary to satisfy
1082 # servers of happiness were pushed.
1083 d.addCallback(lambda results:
1084 self.failUnlessEqual(results.pushed_shares, 3))
1088 def test_happiness_with_some_readonly_peers(self):
1089 # Try the following layout
1090 # server 2: shares 0-9
1091 # server 4: share 0, read-only
1092 # server 3: share 1, read-only
1093 # server 1: share 2, read-only
1094 self.basedir = self.mktemp()
1095 d = self._setup_and_upload()
1096 d.addCallback(lambda ign:
1097 self._add_server_with_share(server_number=2, share_number=0))
1098 d.addCallback(lambda ign:
1099 self._add_server_with_share(server_number=3, share_number=1,
1101 d.addCallback(lambda ign:
1102 self._add_server_with_share(server_number=1, share_number=2,
1104 # Copy all of the other shares to server number 2
1105 def _copy_shares(ign):
1106 for i in xrange(1, 10):
1107 self._copy_share_to_server(i, 2)
1108 d.addCallback(_copy_shares)
1109 # Remove server 0, and add another in its place
1110 d.addCallback(lambda ign:
1111 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1112 d.addCallback(lambda ign:
1113 self._add_server_with_share(server_number=4, share_number=0,
1115 def _reset_encoding_parameters(ign, happy=4):
1116 client = self.g.clients[0]
1117 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1119 d.addCallback(_reset_encoding_parameters)
1120 d.addCallback(lambda client:
1121 client.upload(upload.Data("data" * 10000, convergence="")))
1125 def test_happiness_with_all_readonly_peers(self):
1126 # server 3: share 1, read-only
1127 # server 1: share 2, read-only
1128 # server 2: shares 0-9, read-only
1129 # server 4: share 0, read-only
1130 # The idea with this test is to make sure that the survey of
1131 # read-only peers doesn't undercount servers of happiness
1132 self.basedir = self.mktemp()
1133 d = self._setup_and_upload()
1134 d.addCallback(lambda ign:
1135 self._add_server_with_share(server_number=4, share_number=0,
1137 d.addCallback(lambda ign:
1138 self._add_server_with_share(server_number=3, share_number=1,
1140 d.addCallback(lambda ign:
1141 self._add_server_with_share(server_number=1, share_number=2,
1143 d.addCallback(lambda ign:
1144 self._add_server_with_share(server_number=2, share_number=0,
1146 def _copy_shares(ign):
1147 for i in xrange(1, 10):
1148 self._copy_share_to_server(i, 2)
1149 d.addCallback(_copy_shares)
1150 d.addCallback(lambda ign:
1151 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1152 def _reset_encoding_parameters(ign, happy=4):
1153 client = self.g.clients[0]
1154 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1156 d.addCallback(_reset_encoding_parameters)
1157 d.addCallback(lambda client:
1158 client.upload(upload.Data("data" * 10000, convergence="")))
1162 def test_dropped_servers_in_encoder(self):
1163 # The Encoder does its own "servers_of_happiness" check if it
1164 # happens to lose a bucket during an upload (it assumes that
1165 # the layout presented to it satisfies "servers_of_happiness"
1166 # until a failure occurs)
1168 # This test simulates an upload where servers break after peer
1169 # selection, but before they are written to.
1170 def _set_basedir(ign=None):
1171 self.basedir = self.mktemp()
1173 d = self._setup_and_upload();
1175 def _do_server_setup(ign):
1176 self._add_server(server_number=1)
1177 self._add_server(server_number=2)
1178 self._add_server(server_number=3)
1179 self._add_server(server_number=4)
1180 self._add_server(server_number=5)
1181 d.addCallback(_do_server_setup)
1182 # remove the original server
1183 # (necessary to ensure that the Tahoe2PeerSelector will distribute
1185 def _remove_server(ign):
1186 server = self.g.servers_by_number[0]
1187 self.g.remove_server(server.my_nodeid)
1188 d.addCallback(_remove_server)
1189 # This should succeed; we still have 4 servers, and the
1190 # happiness of the upload is 4.
1191 d.addCallback(lambda ign:
1192 self._do_upload_with_broken_servers(1))
1193 # Now, do the same thing over again, but drop 2 servers instead
1194 # of 1. This should fail, because servers_of_happiness is 4 and
1195 # we can't satisfy that.
1196 d.addCallback(_set_basedir)
1197 d.addCallback(lambda ign:
1198 self._setup_and_upload())
1199 d.addCallback(_do_server_setup)
1200 d.addCallback(_remove_server)
1201 d.addCallback(lambda ign:
1202 self.shouldFail(UploadUnhappinessError,
1203 "test_dropped_servers_in_encoder",
1204 "shares could be placed on only 3 server(s) "
1205 "such that any 3 of them have enough shares to "
1206 "recover the file, but we were asked to place "
1207 "shares on at least 4",
1208 self._do_upload_with_broken_servers, 2))
1209 # Now do the same thing over again, but make some of the servers
1210 # readonly, break some of the ones that aren't, and make sure that
1211 # happiness accounting is preserved.
1212 d.addCallback(_set_basedir)
1213 d.addCallback(lambda ign:
1214 self._setup_and_upload())
1215 def _do_server_setup_2(ign):
1219 self._add_server_with_share(4, 7, readonly=True)
1220 self._add_server_with_share(5, 8, readonly=True)
1221 d.addCallback(_do_server_setup_2)
1222 d.addCallback(_remove_server)
1223 d.addCallback(lambda ign:
1224 self._do_upload_with_broken_servers(1))
1225 d.addCallback(_set_basedir)
1226 d.addCallback(lambda ign:
1227 self._setup_and_upload())
1228 d.addCallback(_do_server_setup_2)
1229 d.addCallback(_remove_server)
1230 d.addCallback(lambda ign:
1231 self.shouldFail(UploadUnhappinessError,
1232 "test_dropped_servers_in_encoder",
1233 "shares could be placed on only 3 server(s) "
1234 "such that any 3 of them have enough shares to "
1235 "recover the file, but we were asked to place "
1236 "shares on at least 4",
1237 self._do_upload_with_broken_servers, 2))
1241 def test_merge_peers(self):
1242 # merge_peers merges a list of used_peers and a dict of
1243 # shareid -> peerid mappings.
1245 1 : set(["server1"]),
1246 2 : set(["server2"]),
1247 3 : set(["server3"]),
1248 4 : set(["server4", "server5"]),
1249 5 : set(["server1", "server2"]),
1251 # if not provided with a used_peers argument, it should just
1252 # return the first argument unchanged.
1253 self.failUnlessEqual(shares, merge_peers(shares, set([])))
1254 class FakePeerTracker:
1257 for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1258 t = FakePeerTracker()
1263 1 : set(["server1"]),
1264 2 : set(["server2"]),
1265 3 : set(["server3"]),
1266 4 : set(["server4", "server5"]),
1267 5 : set(["server1", "server2", "server5"]),
1268 6 : set(["server6"]),
1269 7 : set(["server7"]),
1270 8 : set(["server8"]),
1272 self.failUnlessEqual(expected, merge_peers(shares, set(trackers)))
1275 5 : set(["server5"]),
1276 6 : set(["server6"]),
1277 7 : set(["server7"]),
1278 8 : set(["server8"]),
1280 self.failUnlessEqual(expected, merge_peers(shares2, set(trackers)))
1284 for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1285 shares3[i] = set([server])
1286 t = FakePeerTracker()
1290 expected[i] = set([server])
1291 self.failUnlessEqual(expected, merge_peers(shares3, set(trackers)))
1294 def test_servers_of_happiness_utility_function(self):
1295 # These tests are concerned with the servers_of_happiness()
1296 # utility function, and its underlying matching algorithm. Other
1297 # aspects of the servers_of_happiness behavior are tested
1298 # elsehwere These tests exist to ensure that
1299 # servers_of_happiness doesn't under or overcount the happiness
1300 # value for given inputs.
1302 # servers_of_happiness expects a dict of
1303 # shnum => set(peerids) as a preexisting shares argument.
1305 1 : set(["server1"]),
1306 2 : set(["server2"]),
1307 3 : set(["server3"]),
1308 4 : set(["server4"])
1310 happy = servers_of_happiness(test1)
1311 self.failUnlessEqual(4, happy)
1312 test1[4] = set(["server1"])
1313 # We've added a duplicate server, so now servers_of_happiness
1314 # should be 3 instead of 4.
1315 happy = servers_of_happiness(test1)
1316 self.failUnlessEqual(3, happy)
1317 # The second argument of merge_peers should be a set of
1318 # objects with peerid and buckets as attributes. In actual use,
1319 # these will be PeerTracker instances, but for testing it is fine
1320 # to make a FakePeerTracker whose job is to hold those instance
1321 # variables to test that part.
1322 class FakePeerTracker:
1325 for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1326 t = FakePeerTracker()
1330 # Recall that test1 is a server layout with servers_of_happiness
1331 # = 3. Since there isn't any overlap between the shnum ->
1332 # set([peerid]) correspondences in test1 and those in trackers,
1333 # the result here should be 7.
1334 test2 = merge_peers(test1, set(trackers))
1335 happy = servers_of_happiness(test2)
1336 self.failUnlessEqual(7, happy)
1337 # Now add an overlapping server to trackers. This is redundant,
1338 # so it should not cause the previously reported happiness value
1340 t = FakePeerTracker()
1341 t.peerid = "server1"
1344 test2 = merge_peers(test1, set(trackers))
1345 happy = servers_of_happiness(test2)
1346 self.failUnlessEqual(7, happy)
1348 happy = servers_of_happiness(test)
1349 self.failUnlessEqual(0, happy)
1350 # Test a more substantial overlap between the trackers and the
1351 # existing assignments.
1353 1 : set(['server1']),
1354 2 : set(['server2']),
1355 3 : set(['server3']),
1356 4 : set(['server4']),
1359 t = FakePeerTracker()
1360 t.peerid = 'server5'
1363 t = FakePeerTracker()
1364 t.peerid = 'server6'
1367 # The value returned by servers_of_happiness is the size
1368 # of a maximum matching in the bipartite graph that
1369 # servers_of_happiness() makes between peerids and share
1370 # numbers. It should find something like this:
1371 # (server 1, share 1)
1372 # (server 2, share 2)
1373 # (server 3, share 3)
1374 # (server 5, share 4)
1375 # (server 6, share 5)
1377 # and, since there are 5 edges in this matching, it should
1379 test2 = merge_peers(test, set(trackers))
1380 happy = servers_of_happiness(test2)
1381 self.failUnlessEqual(5, happy)
1382 # Zooko's first puzzle:
1383 # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1385 # server 1: shares 0, 1
1386 # server 2: shares 1, 2
1389 # This should yield happiness of 3.
1391 0 : set(['server1']),
1392 1 : set(['server1', 'server2']),
1393 2 : set(['server2', 'server3']),
1395 self.failUnlessEqual(3, servers_of_happiness(test))
1396 # Zooko's second puzzle:
1397 # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1399 # server 1: shares 0, 1
1402 # This should yield happiness of 2.
1404 0 : set(['server1']),
1405 1 : set(['server1', 'server2']),
1407 self.failUnlessEqual(2, servers_of_happiness(test))
1410 def test_shares_by_server(self):
1411 test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1412 sbs = shares_by_server(test)
1413 self.failUnlessEqual(set([1]), sbs["server1"])
1414 self.failUnlessEqual(set([2]), sbs["server2"])
1415 self.failUnlessEqual(set([3]), sbs["server3"])
1416 self.failUnlessEqual(set([4]), sbs["server4"])
1418 1 : set(["server1"]),
1419 2 : set(["server1"]),
1420 3 : set(["server1"]),
1421 4 : set(["server2"]),
1422 5 : set(["server2"])
1424 sbs = shares_by_server(test1)
1425 self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1426 self.failUnlessEqual(set([4, 5]), sbs["server2"])
1427 # This should fail unless the peerid part of the mapping is a set
1428 test2 = {1: "server1"}
1429 self.shouldFail(AssertionError,
1430 "test_shares_by_server",
1432 shares_by_server, test2)
1435 def test_existing_share_detection(self):
1436 self.basedir = self.mktemp()
1437 d = self._setup_and_upload()
1438 # Our final setup should look like this:
1439 # server 1: shares 0 - 9, read-only
1443 # The purpose of this test is to make sure that the peer selector
1444 # knows about the shares on server 1, even though it is read-only.
1445 # It used to simply filter these out, which would cause the test
1446 # to fail when servers_of_happiness = 4.
1447 d.addCallback(lambda ign:
1448 self._add_server_with_share(1, 0, True))
1449 d.addCallback(lambda ign:
1450 self._add_server(2))
1451 d.addCallback(lambda ign:
1452 self._add_server(3))
1453 d.addCallback(lambda ign:
1454 self._add_server(4))
1455 def _copy_shares(ign):
1456 for i in xrange(1, 10):
1457 self._copy_share_to_server(i, 1)
1458 d.addCallback(_copy_shares)
1459 d.addCallback(lambda ign:
1460 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1461 def _prepare_client(ign):
1462 client = self.g.clients[0]
1463 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1465 d.addCallback(_prepare_client)
1466 d.addCallback(lambda client:
1467 client.upload(upload.Data("data" * 10000, convergence="")))
1471 def test_query_counting(self):
1472 # If peer selection fails, Tahoe2PeerSelector prints out a lot
1473 # of helpful diagnostic information, including query stats.
1474 # This test helps make sure that that information is accurate.
1475 self.basedir = self.mktemp()
1476 d = self._setup_and_upload()
1478 for i in xrange(1, 11):
1479 self._add_server(server_number=i)
1480 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1481 c = self.g.clients[0]
1482 # We set happy to an unsatisfiable value so that we can check the
1483 # counting in the exception message. The same progress message
1484 # is also used when the upload is successful, but in that case it
1485 # only gets written to a log, so we can't see what it says.
1486 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1488 d.addCallback(_setup)
1489 d.addCallback(lambda c:
1490 self.shouldFail(UploadUnhappinessError, "test_query_counting",
1491 "10 queries placed some shares",
1492 c.upload, upload.Data("data" * 10000,
1494 # Now try with some readonly servers. We want to make sure that
1495 # the readonly peer share discovery phase is counted correctly.
1497 self.basedir = self.mktemp()
1499 d.addCallback(_reset)
1500 d.addCallback(lambda ign:
1501 self._setup_and_upload())
1503 for i in xrange(1, 11):
1504 self._add_server(server_number=i)
1505 self._add_server(server_number=11, readonly=True)
1506 self._add_server(server_number=12, readonly=True)
1507 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1508 c = self.g.clients[0]
1509 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1511 d.addCallback(_then)
1512 d.addCallback(lambda c:
1513 self.shouldFail(UploadUnhappinessError, "test_query_counting",
1514 "2 placed none (of which 2 placed none due to "
1515 "the server being full",
1516 c.upload, upload.Data("data" * 10000,
1518 # Now try the case where the upload process finds a bunch of the
1519 # shares that it wants to place on the first server, including
1520 # the one that it wanted to allocate there. Though no shares will
1521 # be allocated in this request, it should still be called
1522 # productive, since it caused some homeless shares to be
1524 d.addCallback(_reset)
1525 d.addCallback(lambda ign:
1526 self._setup_and_upload())
1529 for i in xrange(1, 11):
1530 self._add_server(server_number=i)
1531 # Copy all of the shares to server 9, since that will be
1532 # the first one that the selector sees.
1533 for i in xrange(10):
1534 self._copy_share_to_server(i, 9)
1535 # Remove server 0, and its contents
1536 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1537 # Make happiness unsatisfiable
1538 c = self.g.clients[0]
1539 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1541 d.addCallback(_next)
1542 d.addCallback(lambda c:
1543 self.shouldFail(UploadUnhappinessError, "test_query_counting",
1544 "1 queries placed some shares",
1545 c.upload, upload.Data("data" * 10000,
1550 def test_upper_limit_on_readonly_queries(self):
1551 self.basedir = self.mktemp()
1552 d = self._setup_and_upload()
1554 for i in xrange(1, 11):
1555 self._add_server(server_number=i, readonly=True)
1556 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1557 c = self.g.clients[0]
1558 c.DEFAULT_ENCODING_PARAMETERS['k'] = 2
1559 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1560 c.DEFAULT_ENCODING_PARAMETERS['n'] = 4
1562 d.addCallback(_then)
1563 d.addCallback(lambda client:
1564 self.shouldFail(UploadUnhappinessError,
1565 "test_upper_limit_on_readonly_queries",
1566 "sent 8 queries to 8 peers",
1568 upload.Data('data' * 10000, convergence="")))
1572 def test_exception_messages_during_peer_selection(self):
1573 # server 1: read-only, no shares
1574 # server 2: read-only, no shares
1575 # server 3: read-only, no shares
1576 # server 4: read-only, no shares
1577 # server 5: read-only, no shares
1578 # This will fail, but we want to make sure that the log messages
1579 # are informative about why it has failed.
1580 self.basedir = self.mktemp()
1581 d = self._setup_and_upload()
1582 d.addCallback(lambda ign:
1583 self._add_server(server_number=1, readonly=True))
1584 d.addCallback(lambda ign:
1585 self._add_server(server_number=2, readonly=True))
1586 d.addCallback(lambda ign:
1587 self._add_server(server_number=3, readonly=True))
1588 d.addCallback(lambda ign:
1589 self._add_server(server_number=4, readonly=True))
1590 d.addCallback(lambda ign:
1591 self._add_server(server_number=5, readonly=True))
1592 d.addCallback(lambda ign:
1593 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1594 def _reset_encoding_parameters(ign, happy=4):
1595 client = self.g.clients[0]
1596 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1598 d.addCallback(_reset_encoding_parameters)
1599 d.addCallback(lambda client:
1600 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1601 "placed 0 shares out of 10 "
1602 "total (10 homeless), want to place shares on at "
1603 "least 4 servers such that any 3 of them have "
1604 "enough shares to recover the file, "
1605 "sent 5 queries to 5 peers, 0 queries placed "
1606 "some shares, 5 placed none "
1607 "(of which 5 placed none due to the server being "
1608 "full and 0 placed none due to an error)",
1610 upload.Data("data" * 10000, convergence="")))
1613 # server 1: read-only, no shares
1614 # server 2: broken, no shares
1615 # server 3: read-only, no shares
1616 # server 4: read-only, no shares
1617 # server 5: read-only, no shares
1619 self.basedir = self.mktemp()
1620 d.addCallback(_reset)
1621 d.addCallback(lambda ign:
1622 self._setup_and_upload())
1623 d.addCallback(lambda ign:
1624 self._add_server(server_number=1, readonly=True))
1625 d.addCallback(lambda ign:
1626 self._add_server(server_number=2))
1627 def _break_server_2(ign):
1628 server = self.g.servers_by_number[2].my_nodeid
1629 # We have to break the server in servers_by_id,
1630 # because the one in servers_by_number isn't wrapped,
1631 # and doesn't look at its broken attribute when answering
1633 self.g.servers_by_id[server].broken = True
1634 d.addCallback(_break_server_2)
1635 d.addCallback(lambda ign:
1636 self._add_server(server_number=3, readonly=True))
1637 d.addCallback(lambda ign:
1638 self._add_server(server_number=4, readonly=True))
1639 d.addCallback(lambda ign:
1640 self._add_server(server_number=5, readonly=True))
1641 d.addCallback(lambda ign:
1642 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1643 d.addCallback(_reset_encoding_parameters)
1644 d.addCallback(lambda client:
1645 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1646 "placed 0 shares out of 10 "
1647 "total (10 homeless), want to place shares on at "
1648 "least 4 servers such that any 3 of them have "
1649 "enough shares to recover the file, "
1650 "sent 5 queries to 5 peers, 0 queries placed "
1651 "some shares, 5 placed none "
1652 "(of which 4 placed none due to the server being "
1653 "full and 1 placed none due to an error)",
1655 upload.Data("data" * 10000, convergence="")))
1656 # server 0, server 1 = empty, accepting shares
1657 # This should place all of the shares, but still fail with happy=4.
1658 # We want to make sure that the exception message is worded correctly.
1659 d.addCallback(_reset)
1660 d.addCallback(lambda ign:
1662 d.addCallback(lambda ign:
1663 self._add_server(server_number=1))
1664 d.addCallback(_reset_encoding_parameters)
1665 d.addCallback(lambda client:
1666 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1667 "shares could be placed or found on only 2 "
1668 "server(s). We were asked to place shares on at "
1669 "least 4 server(s) such that any 3 of them have "
1670 "enough shares to recover the file.",
1671 client.upload, upload.Data("data" * 10000,
1673 # servers 0 - 4 = empty, accepting shares
1674 # This too should place all the shares, and this too should fail,
1675 # but since the effective happiness is more than the k encoding
1676 # parameter, it should trigger a different error message than the one
1678 d.addCallback(_reset)
1679 d.addCallback(lambda ign:
1681 d.addCallback(lambda ign:
1682 self._add_server(server_number=1))
1683 d.addCallback(lambda ign:
1684 self._add_server(server_number=2))
1685 d.addCallback(lambda ign:
1686 self._add_server(server_number=3))
1687 d.addCallback(lambda ign:
1688 self._add_server(server_number=4))
1689 d.addCallback(_reset_encoding_parameters, happy=7)
1690 d.addCallback(lambda client:
1691 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1692 "shares could be placed on only 5 server(s) such "
1693 "that any 3 of them have enough shares to recover "
1694 "the file, but we were asked to place shares on "
1695 "at least 7 such servers.",
1696 client.upload, upload.Data("data" * 10000,
1698 # server 0: shares 0 - 9
1699 # server 1: share 0, read-only
1700 # server 2: share 0, read-only
1701 # server 3: share 0, read-only
1702 # This should place all of the shares, but fail with happy=4.
1703 # Since the number of servers with shares is more than the number
1704 # necessary to reconstitute the file, this will trigger a different
1705 # error message than either of those above.
1706 d.addCallback(_reset)
1707 d.addCallback(lambda ign:
1708 self._setup_and_upload())
1709 d.addCallback(lambda ign:
1710 self._add_server_with_share(server_number=1, share_number=0,
1712 d.addCallback(lambda ign:
1713 self._add_server_with_share(server_number=2, share_number=0,
1715 d.addCallback(lambda ign:
1716 self._add_server_with_share(server_number=3, share_number=0,
1718 d.addCallback(_reset_encoding_parameters, happy=7)
1719 d.addCallback(lambda client:
1720 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1721 "shares could be placed or found on 4 server(s), "
1722 "but they are not spread out evenly enough to "
1723 "ensure that any 3 of these servers would have "
1724 "enough shares to recover the file. We were asked "
1725 "to place shares on at least 7 servers such that "
1726 "any 3 of them have enough shares to recover the "
1728 client.upload, upload.Data("data" * 10000,
1733 def test_problem_layout_comment_187(self):
1734 # #778 comment 187 broke an initial attempt at a share
1735 # redistribution algorithm. This test is here to demonstrate the
1736 # breakage, and to test that subsequent algorithms don't also
1737 # break in the same way.
1738 self.basedir = self.mktemp()
1739 d = self._setup_and_upload(k=2, n=3)
1741 # server 1: shares 0, 1, 2, readonly
1742 # server 2: share 0, readonly
1745 self._add_server_with_share(server_number=1, share_number=0,
1747 self._add_server_with_share(server_number=2, share_number=0,
1749 self._add_server_with_share(server_number=3, share_number=0)
1751 self._copy_share_to_server(1, 1)
1752 self._copy_share_to_server(2, 1)
1754 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1755 client = self.g.clients[0]
1756 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
1759 d.addCallback(_setup)
1760 d.addCallback(lambda client:
1761 client.upload(upload.Data("data" * 10000, convergence="")))
1763 test_problem_layout_comment_187.todo = "this isn't fixed yet"
1766 def test_upload_succeeds_with_some_homeless_shares(self):
1767 # If the upload is forced to stop trying to place shares before
1768 # it has placed (or otherwise accounted) for all of them, but it
1769 # has placed enough to satisfy the upload health criteria that
1770 # we're using, it should still succeed.
1771 self.basedir = self.mktemp()
1772 d = self._setup_and_upload()
1773 def _server_setup(ign):
1774 # Add four servers so that we have a layout like this:
1775 # server 1: share 0, read-only
1776 # server 2: share 1, read-only
1777 # server 3: share 2, read-only
1778 # server 4: share 3, read-only
1779 # If we set happy = 4, the upload will manage to satisfy
1780 # servers of happiness, but not place all of the shares; we
1781 # want to test that the upload is declared successful in
1783 self._add_server_with_share(server_number=1, share_number=0,
1785 self._add_server_with_share(server_number=2, share_number=1,
1787 self._add_server_with_share(server_number=3, share_number=2,
1789 self._add_server_with_share(server_number=4, share_number=3,
1792 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1793 # Set the client appropriately
1794 c = self.g.clients[0]
1795 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1797 d.addCallback(_server_setup)
1798 d.addCallback(lambda client:
1799 client.upload(upload.Data("data" * 10000, convergence="")))
1803 def test_uploader_skips_over_servers_with_only_one_share(self):
1804 # We want to make sure that the redistribution logic ignores
1805 # servers with only one share, since placing these shares
1806 # elsewhere will at best keep happiness the same as it was, and
1808 self.basedir = self.mktemp()
1809 d = self._setup_and_upload()
1810 def _server_setup(ign):
1811 # Add some servers so that the upload will need to
1812 # redistribute, but will first pass over a couple of servers
1813 # that don't have enough shares to redistribute before
1814 # finding one that does have shares to redistribute.
1815 self._add_server_with_share(server_number=1, share_number=0)
1816 self._add_server_with_share(server_number=2, share_number=2)
1817 self._add_server_with_share(server_number=3, share_number=1)
1818 self._add_server_with_share(server_number=8, share_number=4)
1819 self._add_server_with_share(server_number=5, share_number=5)
1820 self._add_server_with_share(server_number=10, share_number=7)
1822 self._copy_share_to_server(i, 2)
1823 return self.g.clients[0]
1824 d.addCallback(_server_setup)
1825 d.addCallback(lambda client:
1826 client.upload(upload.Data("data" * 10000, convergence="")))
1830 def test_peer_selector_bucket_abort(self):
1831 # If peer selection for an upload fails due to an unhappy
1832 # layout, the peer selection process should abort the buckets it
1833 # allocates before failing, so that the space can be re-used.
1834 self.basedir = self.mktemp()
1835 self.set_up_grid(num_servers=5)
1837 # Try to upload a file with happy=7, which is unsatisfiable with
1838 # the current grid. This will fail, but should not take up any
1839 # space on the storage servers after it fails.
1840 client = self.g.clients[0]
1841 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
1842 d = defer.succeed(None)
1843 d.addCallback(lambda ignored:
1844 self.shouldFail(UploadUnhappinessError,
1845 "test_peer_selection_bucket_abort",
1847 client.upload, upload.Data("data" * 10000,
1849 # wait for the abort messages to get there.
1850 def _turn_barrier(res):
1851 return fireEventually(res)
1852 d.addCallback(_turn_barrier)
1854 for server in self.g.servers_by_number.values():
1855 self.failUnlessEqual(server.allocated_size(), 0)
1856 d.addCallback(_then)
1860 def test_encoder_bucket_abort(self):
1861 # If enough servers die in the process of encoding and uploading
1862 # a file to make the layout unhappy, we should cancel the
1863 # newly-allocated buckets before dying.
1864 self.basedir = self.mktemp()
1865 self.set_up_grid(num_servers=4)
1867 client = self.g.clients[0]
1868 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
1870 d = defer.succeed(None)
1871 d.addCallback(lambda ignored:
1872 self.shouldFail(UploadUnhappinessError,
1873 "test_encoder_bucket_abort",
1875 self._do_upload_with_broken_servers, 1))
1876 def _turn_barrier(res):
1877 return fireEventually(res)
1878 d.addCallback(_turn_barrier)
1880 for server in self.g.servers_by_number.values():
1881 self.failUnlessEqual(server.allocated_size(), 0)
1882 d.addCallback(_then)
1886 def _set_up_nodes_extra_config(self, clientdir):
1887 cfgfn = os.path.join(clientdir, "tahoe.cfg")
1888 oldcfg = open(cfgfn, "r").read()
1889 f = open(cfgfn, "wt")
1892 f.write("[client]\n")
1893 f.write("shares.needed = 7\n")
1894 f.write("shares.total = 12\n")
1900 # upload with exactly 75 peers (shares_of_happiness)
1901 # have a download fail
1902 # cancel a download (need to implement more cancel stuff)