3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.python.failure import Failure
6 from twisted.python import log
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
14 from allmydata.util.assertutil import precondition
15 from allmydata.util.deferredutil import DeferredListShouldSucceed
16 from allmydata.util.happinessutil import servers_of_happiness, \
17 shares_by_server, merge_peers
18 from no_network import GridTestMixin
19 from common_util import ShouldFailMixin
20 from allmydata.storage_client import StorageFarmBroker
21 from allmydata.storage.server import storage_index_to_dir
25 def extract_uri(results):
28 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
29 # have been due to an earlier test ERROR'ing out due to timeout, which seems
30 # to screw up subsequent tests.
33 class Uploadable(unittest.TestCase):
34 def shouldEqual(self, data, expected):
35 self.failUnless(isinstance(data, list))
37 self.failUnless(isinstance(e, str))
39 self.failUnlessEqual(s, expected)
41 def test_filehandle_random_key(self):
42 return self._test_filehandle(convergence=None)
44 def test_filehandle_convergent_encryption(self):
45 return self._test_filehandle(convergence="some convergence string")
47 def _test_filehandle(self, convergence):
49 u = upload.FileHandle(s, convergence=convergence)
51 d.addCallback(self.failUnlessEqual, 41)
52 d.addCallback(lambda res: u.read(1))
53 d.addCallback(self.shouldEqual, "a")
54 d.addCallback(lambda res: u.read(80))
55 d.addCallback(self.shouldEqual, "a"*40)
56 d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
57 d.addCallback(lambda res: s.close()) # that privilege is reserved for us
60 def test_filename(self):
61 basedir = "upload/Uploadable/test_filename"
63 fn = os.path.join(basedir, "file")
67 u = upload.FileName(fn, convergence=None)
69 d.addCallback(self.failUnlessEqual, 41)
70 d.addCallback(lambda res: u.read(1))
71 d.addCallback(self.shouldEqual, "a")
72 d.addCallback(lambda res: u.read(80))
73 d.addCallback(self.shouldEqual, "a"*40)
74 d.addCallback(lambda res: u.close())
79 u = upload.Data(s, convergence=None)
81 d.addCallback(self.failUnlessEqual, 41)
82 d.addCallback(lambda res: u.read(1))
83 d.addCallback(self.shouldEqual, "a")
84 d.addCallback(lambda res: u.read(80))
85 d.addCallback(self.shouldEqual, "a"*40)
86 d.addCallback(lambda res: u.close())
89 class ServerError(Exception):
93 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
97 "max_segment_size": max_segsize,
99 self.node.DEFAULT_ENCODING_PARAMETERS = p
101 class FakeStorageServer:
102 def __init__(self, mode):
106 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
107 { "maximum-immutable-share-size": 2**32 },
108 "application-version": str(allmydata.__full_version__),
111 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
112 { "maximum-immutable-share-size": 10 },
113 "application-version": str(allmydata.__full_version__),
117 def callRemote(self, methname, *args, **kwargs):
119 meth = getattr(self, methname)
120 return meth(*args, **kwargs)
122 d.addCallback(lambda res: _call())
125 def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
126 sharenums, share_size, canary):
127 #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
128 if self.mode == "first-fail":
129 if self.queries == 0:
131 if self.mode == "second-fail":
132 if self.queries == 1:
135 if self.mode == "full":
137 elif self.mode == "already got them":
138 return (set(sharenums), {},)
140 for shnum in sharenums:
141 self.allocated.append( (storage_index, shnum) )
143 dict([( shnum, FakeBucketWriter(share_size) )
144 for shnum in sharenums]),
147 class FakeBucketWriter:
148 # a diagnostic version of storageserver.BucketWriter
149 def __init__(self, size):
150 self.data = StringIO()
154 def callRemote(self, methname, *args, **kwargs):
156 meth = getattr(self, "remote_" + methname)
157 return meth(*args, **kwargs)
159 d.addCallback(lambda res: _call())
162 def remote_write(self, offset, data):
163 precondition(not self.closed)
164 precondition(offset >= 0)
165 precondition(offset+len(data) <= self._size,
166 "offset=%d + data=%d > size=%d" %
167 (offset, len(data), self._size))
168 self.data.seek(offset)
169 self.data.write(data)
171 def remote_close(self):
172 precondition(not self.closed)
175 def remote_abort(self):
176 log.err(RuntimeError("uh oh, I was asked to abort"))
179 DEFAULT_ENCODING_PARAMETERS = {"k":25,
182 "max_segment_size": 1*MiB,
184 def __init__(self, mode="good", num_servers=50):
185 self.num_servers = num_servers
186 if type(mode) is str:
187 mode = dict([i,mode] for i in range(num_servers))
188 peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
189 for fakeid in range(self.num_servers) ]
190 self.storage_broker = StorageFarmBroker(None, permute_peers=True)
191 for (serverid, server) in peers:
192 self.storage_broker.test_add_server(serverid, server)
193 self.last_peers = [p[1] for p in peers]
195 def log(self, *args, **kwargs):
197 def get_encoding_parameters(self):
198 return self.DEFAULT_ENCODING_PARAMETERS
199 def get_storage_broker(self):
200 return self.storage_broker
201 _secret_holder = client.SecretHolder("lease secret", "convergence secret")
203 class GotTooFarError(Exception):
206 class GiganticUploadable(upload.FileHandle):
207 def __init__(self, size):
211 def get_encryption_key(self):
212 return defer.succeed("\x00" * 16)
214 return defer.succeed(self._size)
215 def read(self, length):
216 left = self._size - self._fp
217 length = min(left, length)
219 if self._fp > 1000000:
220 # terminate the test early.
221 raise GotTooFarError("we shouldn't be allowed to get this far")
222 return defer.succeed(["\x00" * length])
227 Once upon a time, there was a beautiful princess named Buttercup. She lived
228 in a magical land where every file was stored securely among millions of
229 machines, and nobody ever worried about their data being lost ever again.
232 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
236 SIZE_LARGE = len(DATA)
238 def upload_data(uploader, data):
239 u = upload.Data(data, convergence=None)
240 return uploader.upload(u)
241 def upload_filename(uploader, filename):
242 u = upload.FileName(filename, convergence=None)
243 return uploader.upload(u)
244 def upload_filehandle(uploader, fh):
245 u = upload.FileHandle(fh, convergence=None)
246 return uploader.upload(u)
248 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
250 self.node = FakeClient(mode="good")
251 self.u = upload.Uploader()
252 self.u.running = True
253 self.u.parent = self.node
255 def _check_small(self, newuri, size):
256 u = uri.from_string(newuri)
257 self.failUnless(isinstance(u, uri.LiteralFileURI))
258 self.failUnlessEqual(len(u.data), size)
260 def _check_large(self, newuri, size):
261 u = uri.from_string(newuri)
262 self.failUnless(isinstance(u, uri.CHKFileURI))
263 self.failUnless(isinstance(u.get_storage_index(), str))
264 self.failUnlessEqual(len(u.get_storage_index()), 16)
265 self.failUnless(isinstance(u.key, str))
266 self.failUnlessEqual(len(u.key), 16)
267 self.failUnlessEqual(u.size, size)
269 def get_data(self, size):
272 def test_too_large(self):
273 # we've removed the 4GiB share size limit (see ticket #346 for
274 # details), but still have an 8-byte field, so the limit is now
275 # 2**64, so make sure we reject files larger than that.
276 k = 3; happy = 7; n = 10
277 self.set_encoding_parameters(k, happy, n)
279 data1 = GiganticUploadable(big)
280 d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
281 "This file is too large to be uploaded (data_size)",
282 self.u.upload, data1)
283 data2 = GiganticUploadable(big-3)
284 d.addCallback(lambda res:
285 self.shouldFail(FileTooLargeError,
286 "test_too_large-data2",
287 "This file is too large to be uploaded (offsets)",
288 self.u.upload, data2))
289 # I don't know where the actual limit is.. it depends upon how large
290 # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
293 def test_data_zero(self):
294 data = self.get_data(SIZE_ZERO)
295 d = upload_data(self.u, data)
296 d.addCallback(extract_uri)
297 d.addCallback(self._check_small, SIZE_ZERO)
300 def test_data_small(self):
301 data = self.get_data(SIZE_SMALL)
302 d = upload_data(self.u, data)
303 d.addCallback(extract_uri)
304 d.addCallback(self._check_small, SIZE_SMALL)
307 def test_data_large(self):
308 data = self.get_data(SIZE_LARGE)
309 d = upload_data(self.u, data)
310 d.addCallback(extract_uri)
311 d.addCallback(self._check_large, SIZE_LARGE)
314 def test_data_large_odd_segments(self):
315 data = self.get_data(SIZE_LARGE)
316 segsize = int(SIZE_LARGE / 2.5)
317 # we want 3 segments, since that's not a power of two
318 self.set_encoding_parameters(25, 25, 100, segsize)
319 d = upload_data(self.u, data)
320 d.addCallback(extract_uri)
321 d.addCallback(self._check_large, SIZE_LARGE)
324 def test_filehandle_zero(self):
325 data = self.get_data(SIZE_ZERO)
326 d = upload_filehandle(self.u, StringIO(data))
327 d.addCallback(extract_uri)
328 d.addCallback(self._check_small, SIZE_ZERO)
331 def test_filehandle_small(self):
332 data = self.get_data(SIZE_SMALL)
333 d = upload_filehandle(self.u, StringIO(data))
334 d.addCallback(extract_uri)
335 d.addCallback(self._check_small, SIZE_SMALL)
338 def test_filehandle_large(self):
339 data = self.get_data(SIZE_LARGE)
340 d = upload_filehandle(self.u, StringIO(data))
341 d.addCallback(extract_uri)
342 d.addCallback(self._check_large, SIZE_LARGE)
345 def test_filename_zero(self):
346 fn = "Uploader-test_filename_zero.data"
348 data = self.get_data(SIZE_ZERO)
351 d = upload_filename(self.u, fn)
352 d.addCallback(extract_uri)
353 d.addCallback(self._check_small, SIZE_ZERO)
356 def test_filename_small(self):
357 fn = "Uploader-test_filename_small.data"
359 data = self.get_data(SIZE_SMALL)
362 d = upload_filename(self.u, fn)
363 d.addCallback(extract_uri)
364 d.addCallback(self._check_small, SIZE_SMALL)
367 def test_filename_large(self):
368 fn = "Uploader-test_filename_large.data"
370 data = self.get_data(SIZE_LARGE)
373 d = upload_filename(self.u, fn)
374 d.addCallback(extract_uri)
375 d.addCallback(self._check_large, SIZE_LARGE)
378 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
379 def make_node(self, mode, num_servers=10):
380 self.node = FakeClient(mode, num_servers)
381 self.u = upload.Uploader()
382 self.u.running = True
383 self.u.parent = self.node
385 def _check_large(self, newuri, size):
386 u = uri.from_string(newuri)
387 self.failUnless(isinstance(u, uri.CHKFileURI))
388 self.failUnless(isinstance(u.get_storage_index(), str))
389 self.failUnlessEqual(len(u.get_storage_index()), 16)
390 self.failUnless(isinstance(u.key, str))
391 self.failUnlessEqual(len(u.key), 16)
392 self.failUnlessEqual(u.size, size)
394 def test_first_error(self):
395 mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
397 self.set_encoding_parameters(k=25, happy=1, n=50)
398 d = upload_data(self.u, DATA)
399 d.addCallback(extract_uri)
400 d.addCallback(self._check_large, SIZE_LARGE)
403 def test_first_error_all(self):
404 self.make_node("first-fail")
405 d = self.shouldFail(UploadUnhappinessError, "first_error_all",
406 "peer selection failed",
407 upload_data, self.u, DATA)
409 self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
410 # there should also be a 'last failure was' message
411 self.failUnlessIn("ServerError", str(f.value))
412 d.addCallback(_check)
415 def test_second_error(self):
416 # we want to make sure we make it to a third pass. This means that
417 # the first pass was insufficient to place all shares, and at least
418 # one of second pass servers (other than the last one) accepted a
419 # share (so we'll believe that a third pass will be useful). (if
420 # everyone but the last server throws an error, then we'll send all
421 # the remaining shares to the last server at the end of the second
422 # pass, and if that succeeds, we won't make it to a third pass).
424 # we can achieve this 97.5% of the time by using 40 servers, having
425 # 39 of them fail on the second request, leaving only one to succeed
426 # on the second request. (we need to keep the number of servers low
427 # enough to ensure a second pass with 100 shares).
428 mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
429 self.make_node(mode, 40)
430 d = upload_data(self.u, DATA)
431 d.addCallback(extract_uri)
432 d.addCallback(self._check_large, SIZE_LARGE)
435 def test_second_error_all(self):
436 self.make_node("second-fail")
437 d = self.shouldFail(UploadUnhappinessError, "second_error_all",
438 "peer selection failed",
439 upload_data, self.u, DATA)
441 self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
442 # there should also be a 'last failure was' message
443 self.failUnlessIn("ServerError", str(f.value))
444 d.addCallback(_check)
447 class FullServer(unittest.TestCase):
449 self.node = FakeClient(mode="full")
450 self.u = upload.Uploader()
451 self.u.running = True
452 self.u.parent = self.node
454 def _should_fail(self, f):
455 self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
457 def test_data_large(self):
459 d = upload_data(self.u, data)
460 d.addBoth(self._should_fail)
463 class PeerSelection(unittest.TestCase):
465 def make_client(self, num_servers=50):
466 self.node = FakeClient(mode="good", num_servers=num_servers)
467 self.u = upload.Uploader()
468 self.u.running = True
469 self.u.parent = self.node
471 def get_data(self, size):
474 def _check_large(self, newuri, size):
475 u = uri.from_string(newuri)
476 self.failUnless(isinstance(u, uri.CHKFileURI))
477 self.failUnless(isinstance(u.get_storage_index(), str))
478 self.failUnlessEqual(len(u.get_storage_index()), 16)
479 self.failUnless(isinstance(u.key, str))
480 self.failUnlessEqual(len(u.key), 16)
481 self.failUnlessEqual(u.size, size)
483 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
487 "max_segment_size": max_segsize,
489 self.node.DEFAULT_ENCODING_PARAMETERS = p
491 def test_one_each(self):
492 # if we have 50 shares, and there are 50 peers, and they all accept a
493 # share, we should get exactly one share per peer
496 data = self.get_data(SIZE_LARGE)
497 self.set_encoding_parameters(25, 30, 50)
498 d = upload_data(self.u, data)
499 d.addCallback(extract_uri)
500 d.addCallback(self._check_large, SIZE_LARGE)
502 for p in self.node.last_peers:
503 allocated = p.allocated
504 self.failUnlessEqual(len(allocated), 1)
505 self.failUnlessEqual(p.queries, 1)
506 d.addCallback(_check)
509 def test_two_each(self):
510 # if we have 100 shares, and there are 50 peers, and they all accept
511 # all shares, we should get exactly two shares per peer
514 data = self.get_data(SIZE_LARGE)
515 # if there are 50 peers, then happy needs to be <= 50
516 self.set_encoding_parameters(50, 50, 100)
517 d = upload_data(self.u, data)
518 d.addCallback(extract_uri)
519 d.addCallback(self._check_large, SIZE_LARGE)
521 for p in self.node.last_peers:
522 allocated = p.allocated
523 self.failUnlessEqual(len(allocated), 2)
524 self.failUnlessEqual(p.queries, 2)
525 d.addCallback(_check)
528 def test_one_each_plus_one_extra(self):
529 # if we have 51 shares, and there are 50 peers, then one peer gets
530 # two shares and the rest get just one
533 data = self.get_data(SIZE_LARGE)
534 self.set_encoding_parameters(24, 41, 51)
535 d = upload_data(self.u, data)
536 d.addCallback(extract_uri)
537 d.addCallback(self._check_large, SIZE_LARGE)
541 for p in self.node.last_peers:
542 allocated = p.allocated
543 self.failUnless(len(allocated) in (1,2), len(allocated))
544 if len(allocated) == 1:
545 self.failUnlessEqual(p.queries, 1)
548 self.failUnlessEqual(p.queries, 2)
550 self.failUnlessEqual(len(got_one), 49)
551 self.failUnlessEqual(len(got_two), 1)
552 d.addCallback(_check)
555 def test_four_each(self):
556 # if we have 200 shares, and there are 50 peers, then each peer gets
557 # 4 shares. The design goal is to accomplish this with only two
561 data = self.get_data(SIZE_LARGE)
562 # if there are 50 peers, then happy should be no more than 50 if
563 # we want this to work.
564 self.set_encoding_parameters(100, 50, 200)
565 d = upload_data(self.u, data)
566 d.addCallback(extract_uri)
567 d.addCallback(self._check_large, SIZE_LARGE)
569 for p in self.node.last_peers:
570 allocated = p.allocated
571 self.failUnlessEqual(len(allocated), 4)
572 self.failUnlessEqual(p.queries, 2)
573 d.addCallback(_check)
576 def test_three_of_ten(self):
577 # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
581 data = self.get_data(SIZE_LARGE)
582 self.set_encoding_parameters(3, 3, 10)
583 d = upload_data(self.u, data)
584 d.addCallback(extract_uri)
585 d.addCallback(self._check_large, SIZE_LARGE)
588 for p in self.node.last_peers:
589 allocated = p.allocated
590 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
591 histogram = [counts.get(i, 0) for i in range(5)]
592 self.failUnlessEqual(histogram, [0,0,0,2,1])
593 d.addCallback(_check)
596 def test_some_big_some_small(self):
597 # 10 shares, 20 servers, but half the servers don't support a
598 # share-size large enough for our file
599 mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
600 self.node = FakeClient(mode, num_servers=20)
601 self.u = upload.Uploader()
602 self.u.running = True
603 self.u.parent = self.node
605 data = self.get_data(SIZE_LARGE)
606 self.set_encoding_parameters(3, 5, 10)
607 d = upload_data(self.u, data)
608 d.addCallback(extract_uri)
609 d.addCallback(self._check_large, SIZE_LARGE)
611 # we should have put one share each on the big peers, and zero
612 # shares on the small peers
614 for p in self.node.last_peers:
616 self.failUnlessEqual(len(p.allocated), 1)
617 elif p.mode == "small":
618 self.failUnlessEqual(len(p.allocated), 0)
619 total_allocated += len(p.allocated)
620 self.failUnlessEqual(total_allocated, 10)
621 d.addCallback(_check)
625 class StorageIndex(unittest.TestCase):
626 def test_params_must_matter(self):
627 DATA = "I am some data"
628 u = upload.Data(DATA, convergence="")
629 eu = upload.EncryptAnUploadable(u)
630 d1 = eu.get_storage_index()
632 # CHK means the same data should encrypt the same way
633 u = upload.Data(DATA, convergence="")
634 eu = upload.EncryptAnUploadable(u)
635 d1a = eu.get_storage_index()
637 # but if we use a different convergence string it should be different
638 u = upload.Data(DATA, convergence="wheee!")
639 eu = upload.EncryptAnUploadable(u)
640 d1salt1 = eu.get_storage_index()
642 # and if we add yet a different convergence it should be different again
643 u = upload.Data(DATA, convergence="NOT wheee!")
644 eu = upload.EncryptAnUploadable(u)
645 d1salt2 = eu.get_storage_index()
647 # and if we use the first string again it should be the same as last time
648 u = upload.Data(DATA, convergence="wheee!")
649 eu = upload.EncryptAnUploadable(u)
650 d1salt1a = eu.get_storage_index()
652 # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
653 u = upload.Data(DATA, convergence="")
654 u.encoding_param_k = u.default_encoding_param_k + 1
655 eu = upload.EncryptAnUploadable(u)
656 d2 = eu.get_storage_index()
658 # and if we use a random key, it should be different than the CHK
659 u = upload.Data(DATA, convergence=None)
660 eu = upload.EncryptAnUploadable(u)
661 d3 = eu.get_storage_index()
662 # and different from another instance
663 u = upload.Data(DATA, convergence=None)
664 eu = upload.EncryptAnUploadable(u)
665 d4 = eu.get_storage_index()
667 d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
669 si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
670 self.failUnlessEqual(si1, si1a)
671 self.failIfEqual(si1, si2)
672 self.failIfEqual(si1, si3)
673 self.failIfEqual(si1, si4)
674 self.failIfEqual(si3, si4)
675 self.failIfEqual(si1salt1, si1)
676 self.failIfEqual(si1salt1, si1salt2)
677 self.failIfEqual(si1salt2, si1)
678 self.failUnlessEqual(si1salt1, si1salt1a)
682 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
684 def _do_upload_with_broken_servers(self, servers_to_break):
686 I act like a normal upload, but before I send the results of
687 Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
688 PeerTrackers in the used_peers part of the return result.
690 assert self.g, "I tried to find a grid at self.g, but failed"
691 broker = self.g.clients[0].storage_broker
692 sh = self.g.clients[0]._secret_holder
693 data = upload.Data("data" * 10000, convergence="")
694 data.encoding_param_k = 3
695 data.encoding_param_happy = 4
696 data.encoding_param_n = 10
697 uploadable = upload.EncryptAnUploadable(data)
698 encoder = encode.Encoder()
699 encoder.set_encrypted_uploadable(uploadable)
700 status = upload.UploadStatus()
701 selector = upload.Tahoe2PeerSelector("dglev", "test", status)
702 storage_index = encoder.get_param("storage_index")
703 share_size = encoder.get_param("share_size")
704 block_size = encoder.get_param("block_size")
705 num_segments = encoder.get_param("num_segments")
706 d = selector.get_shareholders(broker, sh, storage_index,
707 share_size, block_size, num_segments,
709 def _have_shareholders((used_peers, already_peers)):
710 assert servers_to_break <= len(used_peers)
711 for index in xrange(servers_to_break):
712 server = list(used_peers)[index]
713 for share in server.buckets.keys():
714 server.buckets[share].abort()
716 servermap = already_peers.copy()
717 for peer in used_peers:
718 buckets.update(peer.buckets)
719 for bucket in peer.buckets:
720 servermap.setdefault(bucket, set()).add(peer.peerid)
721 encoder.set_shareholders(buckets, servermap)
724 d.addCallback(_have_shareholders)
728 def _add_server(self, server_number, readonly=False):
729 assert self.g, "I tried to find a grid at self.g, but failed"
730 ss = self.g.make_server(server_number, readonly)
731 self.g.add_server(server_number, ss)
734 def _add_server_with_share(self, server_number, share_number=None,
736 self._add_server(server_number, readonly)
737 if share_number is not None:
738 self._copy_share_to_server(share_number, server_number)
741 def _copy_share_to_server(self, share_number, server_number):
742 ss = self.g.servers_by_number[server_number]
743 # Copy share i from the directory associated with the first
744 # storage server to the directory associated with this one.
745 assert self.g, "I tried to find a grid at self.g, but failed"
746 assert self.shares, "I tried to find shares at self.shares, but failed"
747 old_share_location = self.shares[share_number][2]
748 new_share_location = os.path.join(ss.storedir, "shares")
749 si = uri.from_string(self.uri).get_storage_index()
750 new_share_location = os.path.join(new_share_location,
751 storage_index_to_dir(si))
752 if not os.path.exists(new_share_location):
753 os.makedirs(new_share_location)
754 new_share_location = os.path.join(new_share_location,
756 if old_share_location != new_share_location:
757 shutil.copy(old_share_location, new_share_location)
758 shares = self.find_shares(self.uri)
759 # Make sure that the storage server has the share.
760 self.failUnless((share_number, ss.my_nodeid, new_share_location)
763 def _setup_grid(self):
765 I set up a NoNetworkGrid with a single server and client.
767 self.set_up_grid(num_clients=1, num_servers=1)
769 def _setup_and_upload(self, **kwargs):
771 I set up a NoNetworkGrid with a single server and client,
772 upload a file to it, store its uri in self.uri, and store its
773 sharedata in self.shares.
776 client = self.g.clients[0]
777 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
778 if "n" in kwargs and "k" in kwargs:
779 client.DEFAULT_ENCODING_PARAMETERS['k'] = kwargs['k']
780 client.DEFAULT_ENCODING_PARAMETERS['n'] = kwargs['n']
781 data = upload.Data("data" * 10000, convergence="")
783 d = client.upload(data)
786 d.addCallback(_store_uri)
787 d.addCallback(lambda ign:
788 self.find_shares(self.uri))
789 def _store_shares(shares):
791 d.addCallback(_store_shares)
795 def test_configure_parameters(self):
796 self.basedir = self.mktemp()
797 hooks = {0: self._set_up_nodes_extra_config}
798 self.set_up_grid(client_config_hooks=hooks)
799 c0 = self.g.clients[0]
802 u = upload.Data(DATA, convergence="")
804 d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
805 m = monitor.Monitor()
806 d.addCallback(lambda fn: fn.check(m))
809 self.failUnlessEqual(data["count-shares-needed"], 7)
810 self.failUnlessEqual(data["count-shares-expected"], 12)
811 d.addCallback(_check)
815 def _setUp(self, ns):
816 # Used by test_happy_semantics and test_preexisting_share_behavior
817 # to set up the grid.
818 self.node = FakeClient(mode="good", num_servers=ns)
819 self.u = upload.Uploader()
820 self.u.running = True
821 self.u.parent = self.node
824 def test_happy_semantics(self):
826 DATA = upload.Data("kittens" * 10000, convergence="")
827 # These parameters are unsatisfiable with only 2 servers.
828 self.set_encoding_parameters(k=3, happy=5, n=10)
829 d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
830 "shares could be placed or found on only 2 "
831 "server(s). We were asked to place shares on "
832 "at least 5 server(s) such that any 3 of them "
833 "have enough shares to recover the file",
835 # Let's reset the client to have 10 servers
836 d.addCallback(lambda ign:
838 # These parameters are satisfiable with 10 servers.
839 d.addCallback(lambda ign:
840 self.set_encoding_parameters(k=3, happy=5, n=10))
841 d.addCallback(lambda ign:
843 # Let's reset the client to have 7 servers
844 # (this is less than n, but more than h)
845 d.addCallback(lambda ign:
847 # These parameters are satisfiable with 7 servers.
848 d.addCallback(lambda ign:
849 self.set_encoding_parameters(k=3, happy=5, n=10))
850 d.addCallback(lambda ign:
855 def test_problem_layout_comment_52(self):
857 self.basedir = self.mktemp()
859 # This scenario is at
860 # http://allmydata.org/trac/tahoe/ticket/778#comment:52
862 # The scenario in comment:52 proposes that we have a layout
864 # server 0: shares 1 - 9
865 # server 1: share 0, read-only
866 # server 2: share 0, read-only
867 # server 3: share 0, read-only
868 # To get access to the shares, we will first upload to one
869 # server, which will then have shares 0 - 9. We'll then
870 # add three new servers, configure them to not accept any new
871 # shares, then write share 0 directly into the serverdir of each,
872 # and then remove share 0 from server 0 in the same way.
873 # Then each of servers 1 - 3 will report that they have share 0,
874 # and will not accept any new share, while server 0 will report that
875 # it has shares 1 - 9 and will accept new shares.
876 # We'll then set 'happy' = 4, and see that an upload fails
878 d = self._setup_and_upload()
879 d.addCallback(lambda ign:
880 self._add_server_with_share(server_number=1, share_number=0,
882 d.addCallback(lambda ign:
883 self._add_server_with_share(server_number=2, share_number=0,
885 d.addCallback(lambda ign:
886 self._add_server_with_share(server_number=3, share_number=0,
888 # Remove the first share from server 0.
889 def _remove_share_0_from_server_0():
890 share_location = self.shares[0][2]
891 os.remove(share_location)
892 d.addCallback(lambda ign:
893 _remove_share_0_from_server_0())
894 # Set happy = 4 in the client.
896 client = self.g.clients[0]
897 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
899 d.addCallback(lambda ign:
901 # Uploading data should fail
902 d.addCallback(lambda client:
903 self.shouldFail(UploadUnhappinessError,
904 "test_problem_layout_comment_52_test_1",
905 "shares could be placed or found on 4 server(s), "
906 "but they are not spread out evenly enough to "
907 "ensure that any 3 of these servers would have "
908 "enough shares to recover the file. "
909 "We were asked to place shares on at "
910 "least 4 servers such that any 3 of them have "
911 "enough shares to recover the file",
912 client.upload, upload.Data("data" * 10000,
915 # Do comment:52, but like this:
917 # server 3: share 0, read-only
918 # server 1: share 0, read-only
919 # server 0: shares 0-9
920 d.addCallback(lambda ign:
922 d.addCallback(lambda ign:
923 self._setup_and_upload())
924 d.addCallback(lambda ign:
925 self._add_server(server_number=2))
926 d.addCallback(lambda ign:
927 self._add_server_with_share(server_number=3, share_number=0,
929 d.addCallback(lambda ign:
930 self._add_server_with_share(server_number=1, share_number=0,
933 client = self.g.clients[0]
934 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
936 d.addCallback(lambda ign:
938 d.addCallback(lambda client:
939 self.shouldFail(UploadUnhappinessError,
940 "test_problem_layout_comment_52_test_2",
941 "shares could be placed on only 3 server(s) such "
942 "that any 3 of them have enough shares to recover "
943 "the file, but we were asked to place shares on "
944 "at least 4 such servers.",
945 client.upload, upload.Data("data" * 10000,
950 def test_problem_layout_comment_53(self):
951 # This scenario is at
952 # http://allmydata.org/trac/tahoe/ticket/778#comment:53
954 # Set up the grid to have one server
955 def _change_basedir(ign):
956 self.basedir = self.mktemp()
957 _change_basedir(None)
958 # We start by uploading all of the shares to one server.
959 # Next, we'll add three new servers to our NoNetworkGrid. We'll add
960 # one share from our initial upload to each of these.
961 # The counterintuitive ordering of the share numbers is to deal with
962 # the permuting of these servers -- distributing the shares this
963 # way ensures that the Tahoe2PeerSelector sees them in the order
965 d = self._setup_and_upload()
966 d.addCallback(lambda ign:
967 self._add_server_with_share(server_number=1, share_number=2))
968 d.addCallback(lambda ign:
969 self._add_server_with_share(server_number=2, share_number=0))
970 d.addCallback(lambda ign:
971 self._add_server_with_share(server_number=3, share_number=1))
972 # So, we now have the following layout:
973 # server 0: shares 0 - 9
977 # We change the 'happy' parameter in the client to 4.
978 # The Tahoe2PeerSelector will see the peers permuted as:
980 # Ideally, a reupload of our original data should work.
981 def _reset_encoding_parameters(ign, happy=4):
982 client = self.g.clients[0]
983 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
985 d.addCallback(_reset_encoding_parameters)
986 d.addCallback(lambda client:
987 client.upload(upload.Data("data" * 10000, convergence="")))
990 # This scenario is basically comment:53, but changed so that the
991 # Tahoe2PeerSelector sees the server with all of the shares before
992 # any of the other servers.
994 # server 2: shares 0 - 9
998 # The Tahoe2PeerSelector sees the peers permuted as:
1000 # Note that server 0 has been replaced by server 4; this makes it
1001 # easier to ensure that the last server seen by Tahoe2PeerSelector
1002 # has only one share.
1003 d.addCallback(_change_basedir)
1004 d.addCallback(lambda ign:
1005 self._setup_and_upload())
1006 d.addCallback(lambda ign:
1007 self._add_server_with_share(server_number=2, share_number=0))
1008 d.addCallback(lambda ign:
1009 self._add_server_with_share(server_number=3, share_number=1))
1010 d.addCallback(lambda ign:
1011 self._add_server_with_share(server_number=1, share_number=2))
1012 # Copy all of the other shares to server number 2
1013 def _copy_shares(ign):
1014 for i in xrange(0, 10):
1015 self._copy_share_to_server(i, 2)
1016 d.addCallback(_copy_shares)
1017 # Remove the first server, and add a placeholder with share 0
1018 d.addCallback(lambda ign:
1019 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1020 d.addCallback(lambda ign:
1021 self._add_server_with_share(server_number=4, share_number=0))
1022 # Now try uploading.
1023 d.addCallback(_reset_encoding_parameters)
1024 d.addCallback(lambda client:
1025 client.upload(upload.Data("data" * 10000, convergence="")))
1028 # Try the same thing, but with empty servers after the first one
1029 # We want to make sure that Tahoe2PeerSelector will redistribute
1030 # shares as necessary, not simply discover an existing layout.
1032 # server 2: shares 0 - 9
1036 d.addCallback(_change_basedir)
1037 d.addCallback(lambda ign:
1038 self._setup_and_upload())
1039 d.addCallback(lambda ign:
1040 self._add_server(server_number=2))
1041 d.addCallback(lambda ign:
1042 self._add_server(server_number=3))
1043 d.addCallback(lambda ign:
1044 self._add_server(server_number=1))
1045 d.addCallback(lambda ign:
1046 self._add_server(server_number=4))
1047 d.addCallback(_copy_shares)
1048 d.addCallback(lambda ign:
1049 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1050 d.addCallback(_reset_encoding_parameters)
1051 d.addCallback(lambda client:
1052 client.upload(upload.Data("data" * 10000, convergence="")))
1053 # Make sure that only as many shares as necessary to satisfy
1054 # servers of happiness were pushed.
1055 d.addCallback(lambda results:
1056 self.failUnlessEqual(results.pushed_shares, 3))
1060 def test_happiness_with_some_readonly_peers(self):
1061 # Try the following layout
1062 # server 2: shares 0-9
1063 # server 4: share 0, read-only
1064 # server 3: share 1, read-only
1065 # server 1: share 2, read-only
1066 self.basedir = self.mktemp()
1067 d = self._setup_and_upload()
1068 d.addCallback(lambda ign:
1069 self._add_server_with_share(server_number=2, share_number=0))
1070 d.addCallback(lambda ign:
1071 self._add_server_with_share(server_number=3, share_number=1,
1073 d.addCallback(lambda ign:
1074 self._add_server_with_share(server_number=1, share_number=2,
1076 # Copy all of the other shares to server number 2
1077 def _copy_shares(ign):
1078 for i in xrange(1, 10):
1079 self._copy_share_to_server(i, 2)
1080 d.addCallback(_copy_shares)
1081 # Remove server 0, and add another in its place
1082 d.addCallback(lambda ign:
1083 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1084 d.addCallback(lambda ign:
1085 self._add_server_with_share(server_number=4, share_number=0,
1087 def _reset_encoding_parameters(ign, happy=4):
1088 client = self.g.clients[0]
1089 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1091 d.addCallback(_reset_encoding_parameters)
1092 d.addCallback(lambda client:
1093 client.upload(upload.Data("data" * 10000, convergence="")))
1097 def test_happiness_with_all_readonly_peers(self):
1098 # server 3: share 1, read-only
1099 # server 1: share 2, read-only
1100 # server 2: shares 0-9, read-only
1101 # server 4: share 0, read-only
1102 # The idea with this test is to make sure that the survey of
1103 # read-only peers doesn't undercount servers of happiness
1104 self.basedir = self.mktemp()
1105 d = self._setup_and_upload()
1106 d.addCallback(lambda ign:
1107 self._add_server_with_share(server_number=4, share_number=0,
1109 d.addCallback(lambda ign:
1110 self._add_server_with_share(server_number=3, share_number=1,
1112 d.addCallback(lambda ign:
1113 self._add_server_with_share(server_number=1, share_number=2,
1115 d.addCallback(lambda ign:
1116 self._add_server_with_share(server_number=2, share_number=0,
1118 def _copy_shares(ign):
1119 for i in xrange(1, 10):
1120 self._copy_share_to_server(i, 2)
1121 d.addCallback(_copy_shares)
1122 d.addCallback(lambda ign:
1123 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1124 def _reset_encoding_parameters(ign, happy=4):
1125 client = self.g.clients[0]
1126 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1128 d.addCallback(_reset_encoding_parameters)
1129 d.addCallback(lambda client:
1130 client.upload(upload.Data("data" * 10000, convergence="")))
1134 def test_dropped_servers_in_encoder(self):
1135 # The Encoder does its own "servers_of_happiness" check if it
1136 # happens to lose a bucket during an upload (it assumes that
1137 # the layout presented to it satisfies "servers_of_happiness"
1138 # until a failure occurs)
1140 # This test simulates an upload where servers break after peer
1141 # selection, but before they are written to.
1142 def _set_basedir(ign=None):
1143 self.basedir = self.mktemp()
1145 d = self._setup_and_upload();
1147 def _do_server_setup(ign):
1148 self._add_server(server_number=1)
1149 self._add_server(server_number=2)
1150 self._add_server(server_number=3)
1151 self._add_server(server_number=4)
1152 self._add_server(server_number=5)
1153 d.addCallback(_do_server_setup)
1154 # remove the original server
1155 # (necessary to ensure that the Tahoe2PeerSelector will distribute
1157 def _remove_server(ign):
1158 server = self.g.servers_by_number[0]
1159 self.g.remove_server(server.my_nodeid)
1160 d.addCallback(_remove_server)
1161 # This should succeed; we still have 4 servers, and the
1162 # happiness of the upload is 4.
1163 d.addCallback(lambda ign:
1164 self._do_upload_with_broken_servers(1))
1165 # Now, do the same thing over again, but drop 2 servers instead
1166 # of 1. This should fail, because servers_of_happiness is 4 and
1167 # we can't satisfy that.
1168 d.addCallback(_set_basedir)
1169 d.addCallback(lambda ign:
1170 self._setup_and_upload())
1171 d.addCallback(_do_server_setup)
1172 d.addCallback(_remove_server)
1173 d.addCallback(lambda ign:
1174 self.shouldFail(UploadUnhappinessError,
1175 "test_dropped_servers_in_encoder",
1176 "shares could be placed on only 3 server(s) "
1177 "such that any 3 of them have enough shares to "
1178 "recover the file, but we were asked to place "
1179 "shares on at least 4",
1180 self._do_upload_with_broken_servers, 2))
1181 # Now do the same thing over again, but make some of the servers
1182 # readonly, break some of the ones that aren't, and make sure that
1183 # happiness accounting is preserved.
1184 d.addCallback(_set_basedir)
1185 d.addCallback(lambda ign:
1186 self._setup_and_upload())
1187 def _do_server_setup_2(ign):
1191 self._add_server_with_share(4, 7, readonly=True)
1192 self._add_server_with_share(5, 8, readonly=True)
1193 d.addCallback(_do_server_setup_2)
1194 d.addCallback(_remove_server)
1195 d.addCallback(lambda ign:
1196 self._do_upload_with_broken_servers(1))
1197 d.addCallback(_set_basedir)
1198 d.addCallback(lambda ign:
1199 self._setup_and_upload())
1200 d.addCallback(_do_server_setup_2)
1201 d.addCallback(_remove_server)
1202 d.addCallback(lambda ign:
1203 self.shouldFail(UploadUnhappinessError,
1204 "test_dropped_servers_in_encoder",
1205 "shares could be placed on only 3 server(s) "
1206 "such that any 3 of them have enough shares to "
1207 "recover the file, but we were asked to place "
1208 "shares on at least 4",
1209 self._do_upload_with_broken_servers, 2))
1213 def test_merge_peers(self):
1214 # merge_peers merges a list of used_peers and a dict of
1215 # shareid -> peerid mappings.
1217 1 : set(["server1"]),
1218 2 : set(["server2"]),
1219 3 : set(["server3"]),
1220 4 : set(["server4", "server5"]),
1221 5 : set(["server1", "server2"]),
1223 # if not provided with a used_peers argument, it should just
1224 # return the first argument unchanged.
1225 self.failUnlessEqual(shares, merge_peers(shares, set([])))
1226 class FakePeerTracker:
1229 for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1230 t = FakePeerTracker()
1235 1 : set(["server1"]),
1236 2 : set(["server2"]),
1237 3 : set(["server3"]),
1238 4 : set(["server4", "server5"]),
1239 5 : set(["server1", "server2", "server5"]),
1240 6 : set(["server6"]),
1241 7 : set(["server7"]),
1242 8 : set(["server8"]),
1244 self.failUnlessEqual(expected, merge_peers(shares, set(trackers)))
1247 5 : set(["server5"]),
1248 6 : set(["server6"]),
1249 7 : set(["server7"]),
1250 8 : set(["server8"]),
1252 self.failUnlessEqual(expected, merge_peers(shares2, set(trackers)))
1256 for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1257 shares3[i] = set([server])
1258 t = FakePeerTracker()
1262 expected[i] = set([server])
1263 self.failUnlessEqual(expected, merge_peers(shares3, set(trackers)))
1266 def test_servers_of_happiness_utility_function(self):
1267 # These tests are concerned with the servers_of_happiness()
1268 # utility function, and its underlying matching algorithm. Other
1269 # aspects of the servers_of_happiness behavior are tested
1270 # elsehwere These tests exist to ensure that
1271 # servers_of_happiness doesn't under or overcount the happiness
1272 # value for given inputs.
1274 # servers_of_happiness expects a dict of
1275 # shnum => set(peerids) as a preexisting shares argument.
1277 1 : set(["server1"]),
1278 2 : set(["server2"]),
1279 3 : set(["server3"]),
1280 4 : set(["server4"])
1282 happy = servers_of_happiness(test1)
1283 self.failUnlessEqual(4, happy)
1284 test1[4] = set(["server1"])
1285 # We've added a duplicate server, so now servers_of_happiness
1286 # should be 3 instead of 4.
1287 happy = servers_of_happiness(test1)
1288 self.failUnlessEqual(3, happy)
1289 # The second argument of merge_peers should be a set of
1290 # objects with peerid and buckets as attributes. In actual use,
1291 # these will be PeerTracker instances, but for testing it is fine
1292 # to make a FakePeerTracker whose job is to hold those instance
1293 # variables to test that part.
1294 class FakePeerTracker:
1297 for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1298 t = FakePeerTracker()
1302 # Recall that test1 is a server layout with servers_of_happiness
1303 # = 3. Since there isn't any overlap between the shnum ->
1304 # set([peerid]) correspondences in test1 and those in trackers,
1305 # the result here should be 7.
1306 test2 = merge_peers(test1, set(trackers))
1307 happy = servers_of_happiness(test2)
1308 self.failUnlessEqual(7, happy)
1309 # Now add an overlapping server to trackers. This is redundant,
1310 # so it should not cause the previously reported happiness value
1312 t = FakePeerTracker()
1313 t.peerid = "server1"
1316 test2 = merge_peers(test1, set(trackers))
1317 happy = servers_of_happiness(test2)
1318 self.failUnlessEqual(7, happy)
1320 happy = servers_of_happiness(test)
1321 self.failUnlessEqual(0, happy)
1322 # Test a more substantial overlap between the trackers and the
1323 # existing assignments.
1325 1 : set(['server1']),
1326 2 : set(['server2']),
1327 3 : set(['server3']),
1328 4 : set(['server4']),
1331 t = FakePeerTracker()
1332 t.peerid = 'server5'
1335 t = FakePeerTracker()
1336 t.peerid = 'server6'
1339 # The value returned by servers_of_happiness is the size
1340 # of a maximum matching in the bipartite graph that
1341 # servers_of_happiness() makes between peerids and share
1342 # numbers. It should find something like this:
1343 # (server 1, share 1)
1344 # (server 2, share 2)
1345 # (server 3, share 3)
1346 # (server 5, share 4)
1347 # (server 6, share 5)
1349 # and, since there are 5 edges in this matching, it should
1351 test2 = merge_peers(test, set(trackers))
1352 happy = servers_of_happiness(test2)
1353 self.failUnlessEqual(5, happy)
1354 # Zooko's first puzzle:
1355 # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1357 # server 1: shares 0, 1
1358 # server 2: shares 1, 2
1361 # This should yield happiness of 3.
1363 0 : set(['server1']),
1364 1 : set(['server1', 'server2']),
1365 2 : set(['server2', 'server3']),
1367 self.failUnlessEqual(3, servers_of_happiness(test))
1368 # Zooko's second puzzle:
1369 # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1371 # server 1: shares 0, 1
1374 # This should yield happiness of 2.
1376 0 : set(['server1']),
1377 1 : set(['server1', 'server2']),
1379 self.failUnlessEqual(2, servers_of_happiness(test))
1382 def test_shares_by_server(self):
1383 test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1384 sbs = shares_by_server(test)
1385 self.failUnlessEqual(set([1]), sbs["server1"])
1386 self.failUnlessEqual(set([2]), sbs["server2"])
1387 self.failUnlessEqual(set([3]), sbs["server3"])
1388 self.failUnlessEqual(set([4]), sbs["server4"])
1390 1 : set(["server1"]),
1391 2 : set(["server1"]),
1392 3 : set(["server1"]),
1393 4 : set(["server2"]),
1394 5 : set(["server2"])
1396 sbs = shares_by_server(test1)
1397 self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1398 self.failUnlessEqual(set([4, 5]), sbs["server2"])
1399 # This should fail unless the peerid part of the mapping is a set
1400 test2 = {1: "server1"}
1401 self.shouldFail(AssertionError,
1402 "test_shares_by_server",
1404 shares_by_server, test2)
1407 def test_existing_share_detection(self):
1408 self.basedir = self.mktemp()
1409 d = self._setup_and_upload()
1410 # Our final setup should look like this:
1411 # server 1: shares 0 - 9, read-only
1415 # The purpose of this test is to make sure that the peer selector
1416 # knows about the shares on server 1, even though it is read-only.
1417 # It used to simply filter these out, which would cause the test
1418 # to fail when servers_of_happiness = 4.
1419 d.addCallback(lambda ign:
1420 self._add_server_with_share(1, 0, True))
1421 d.addCallback(lambda ign:
1422 self._add_server(2))
1423 d.addCallback(lambda ign:
1424 self._add_server(3))
1425 d.addCallback(lambda ign:
1426 self._add_server(4))
1427 def _copy_shares(ign):
1428 for i in xrange(1, 10):
1429 self._copy_share_to_server(i, 1)
1430 d.addCallback(_copy_shares)
1431 d.addCallback(lambda ign:
1432 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1433 def _prepare_client(ign):
1434 client = self.g.clients[0]
1435 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1437 d.addCallback(_prepare_client)
1438 d.addCallback(lambda client:
1439 client.upload(upload.Data("data" * 10000, convergence="")))
1443 def test_query_counting(self):
1444 # If peer selection fails, Tahoe2PeerSelector prints out a lot
1445 # of helpful diagnostic information, including query stats.
1446 # This test helps make sure that that information is accurate.
1447 self.basedir = self.mktemp()
1448 d = self._setup_and_upload()
1450 for i in xrange(1, 11):
1451 self._add_server(server_number=i)
1452 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1453 c = self.g.clients[0]
1454 # We set happy to an unsatisfiable value so that we can check the
1455 # counting in the exception message. The same progress message
1456 # is also used when the upload is successful, but in that case it
1457 # only gets written to a log, so we can't see what it says.
1458 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1460 d.addCallback(_setup)
1461 d.addCallback(lambda c:
1462 self.shouldFail(UploadUnhappinessError, "test_query_counting",
1463 "10 queries placed some shares",
1464 c.upload, upload.Data("data" * 10000,
1466 # Now try with some readonly servers. We want to make sure that
1467 # the readonly peer share discovery phase is counted correctly.
1469 self.basedir = self.mktemp()
1471 d.addCallback(_reset)
1472 d.addCallback(lambda ign:
1473 self._setup_and_upload())
1475 for i in xrange(1, 11):
1476 self._add_server(server_number=i)
1477 self._add_server(server_number=11, readonly=True)
1478 self._add_server(server_number=12, readonly=True)
1479 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1480 c = self.g.clients[0]
1481 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1483 d.addCallback(_then)
1484 d.addCallback(lambda c:
1485 self.shouldFail(UploadUnhappinessError, "test_query_counting",
1486 "2 placed none (of which 2 placed none due to "
1487 "the server being full",
1488 c.upload, upload.Data("data" * 10000,
1490 # Now try the case where the upload process finds a bunch of the
1491 # shares that it wants to place on the first server, including
1492 # the one that it wanted to allocate there. Though no shares will
1493 # be allocated in this request, it should still be called
1494 # productive, since it caused some homeless shares to be
1496 d.addCallback(_reset)
1497 d.addCallback(lambda ign:
1498 self._setup_and_upload())
1501 for i in xrange(1, 11):
1502 self._add_server(server_number=i)
1503 # Copy all of the shares to server 9, since that will be
1504 # the first one that the selector sees.
1505 for i in xrange(10):
1506 self._copy_share_to_server(i, 9)
1507 # Remove server 0, and its contents
1508 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1509 # Make happiness unsatisfiable
1510 c = self.g.clients[0]
1511 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1513 d.addCallback(_next)
1514 d.addCallback(lambda c:
1515 self.shouldFail(UploadUnhappinessError, "test_query_counting",
1516 "1 queries placed some shares",
1517 c.upload, upload.Data("data" * 10000,
1522 def test_upper_limit_on_readonly_queries(self):
1523 self.basedir = self.mktemp()
1524 d = self._setup_and_upload()
1526 for i in xrange(1, 11):
1527 self._add_server(server_number=i, readonly=True)
1528 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1529 c = self.g.clients[0]
1530 c.DEFAULT_ENCODING_PARAMETERS['k'] = 2
1531 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1532 c.DEFAULT_ENCODING_PARAMETERS['n'] = 4
1534 d.addCallback(_then)
1535 d.addCallback(lambda client:
1536 self.shouldFail(UploadUnhappinessError,
1537 "test_upper_limit_on_readonly_queries",
1538 "sent 8 queries to 8 peers",
1540 upload.Data('data' * 10000, convergence="")))
1544 def test_exception_messages_during_peer_selection(self):
1545 # server 1: read-only, no shares
1546 # server 2: read-only, no shares
1547 # server 3: read-only, no shares
1548 # server 4: read-only, no shares
1549 # server 5: read-only, no shares
1550 # This will fail, but we want to make sure that the log messages
1551 # are informative about why it has failed.
1552 self.basedir = self.mktemp()
1553 d = self._setup_and_upload()
1554 d.addCallback(lambda ign:
1555 self._add_server(server_number=1, readonly=True))
1556 d.addCallback(lambda ign:
1557 self._add_server(server_number=2, readonly=True))
1558 d.addCallback(lambda ign:
1559 self._add_server(server_number=3, readonly=True))
1560 d.addCallback(lambda ign:
1561 self._add_server(server_number=4, readonly=True))
1562 d.addCallback(lambda ign:
1563 self._add_server(server_number=5, readonly=True))
1564 d.addCallback(lambda ign:
1565 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1566 def _reset_encoding_parameters(ign):
1567 client = self.g.clients[0]
1568 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1570 d.addCallback(_reset_encoding_parameters)
1571 d.addCallback(lambda client:
1572 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1573 "placed 0 shares out of 10 "
1574 "total (10 homeless), want to place shares on at "
1575 "least 4 servers such that any 3 of them have "
1576 "enough shares to recover the file, "
1577 "sent 5 queries to 5 peers, 0 queries placed "
1578 "some shares, 5 placed none "
1579 "(of which 5 placed none due to the server being "
1580 "full and 0 placed none due to an error)",
1582 upload.Data("data" * 10000, convergence="")))
1585 # server 1: read-only, no shares
1586 # server 2: broken, no shares
1587 # server 3: read-only, no shares
1588 # server 4: read-only, no shares
1589 # server 5: read-only, no shares
1591 self.basedir = self.mktemp()
1592 d.addCallback(_reset)
1593 d.addCallback(lambda ign:
1594 self._setup_and_upload())
1595 d.addCallback(lambda ign:
1596 self._add_server(server_number=1, readonly=True))
1597 d.addCallback(lambda ign:
1598 self._add_server(server_number=2))
1599 def _break_server_2(ign):
1600 server = self.g.servers_by_number[2].my_nodeid
1601 # We have to break the server in servers_by_id,
1602 # because the one in servers_by_number isn't wrapped,
1603 # and doesn't look at its broken attribute when answering
1605 self.g.servers_by_id[server].broken = True
1606 d.addCallback(_break_server_2)
1607 d.addCallback(lambda ign:
1608 self._add_server(server_number=3, readonly=True))
1609 d.addCallback(lambda ign:
1610 self._add_server(server_number=4, readonly=True))
1611 d.addCallback(lambda ign:
1612 self._add_server(server_number=5, readonly=True))
1613 d.addCallback(lambda ign:
1614 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1615 def _reset_encoding_parameters(ign, happy=4):
1616 client = self.g.clients[0]
1617 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1619 d.addCallback(_reset_encoding_parameters)
1620 d.addCallback(lambda client:
1621 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1622 "placed 0 shares out of 10 "
1623 "total (10 homeless), want to place shares on at "
1624 "least 4 servers such that any 3 of them have "
1625 "enough shares to recover the file, "
1626 "sent 5 queries to 5 peers, 0 queries placed "
1627 "some shares, 5 placed none "
1628 "(of which 4 placed none due to the server being "
1629 "full and 1 placed none due to an error)",
1631 upload.Data("data" * 10000, convergence="")))
1632 # server 0, server 1 = empty, accepting shares
1633 # This should place all of the shares, but still fail with happy=4.
1634 # We want to make sure that the exception message is worded correctly.
1635 d.addCallback(_reset)
1636 d.addCallback(lambda ign:
1638 d.addCallback(lambda ign:
1639 self._add_server(server_number=1))
1640 d.addCallback(_reset_encoding_parameters)
1641 d.addCallback(lambda client:
1642 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1643 "shares could be placed or found on only 2 "
1644 "server(s). We were asked to place shares on at "
1645 "least 4 server(s) such that any 3 of them have "
1646 "enough shares to recover the file.",
1647 client.upload, upload.Data("data" * 10000,
1649 # servers 0 - 4 = empty, accepting shares
1650 # This too should place all the shares, and this too should fail,
1651 # but since the effective happiness is more than the k encoding
1652 # parameter, it should trigger a different error message than the one
1654 d.addCallback(_reset)
1655 d.addCallback(lambda ign:
1657 d.addCallback(lambda ign:
1658 self._add_server(server_number=1))
1659 d.addCallback(lambda ign:
1660 self._add_server(server_number=2))
1661 d.addCallback(lambda ign:
1662 self._add_server(server_number=3))
1663 d.addCallback(lambda ign:
1664 self._add_server(server_number=4))
1665 d.addCallback(_reset_encoding_parameters, happy=7)
1666 d.addCallback(lambda client:
1667 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1668 "shares could be placed on only 5 server(s) such "
1669 "that any 3 of them have enough shares to recover "
1670 "the file, but we were asked to place shares on "
1671 "at least 7 such servers.",
1672 client.upload, upload.Data("data" * 10000,
1674 # server 0: shares 0 - 9
1675 # server 1: share 0, read-only
1676 # server 2: share 0, read-only
1677 # server 3: share 0, read-only
1678 # This should place all of the shares, but fail with happy=4.
1679 # Since the number of servers with shares is more than the number
1680 # necessary to reconstitute the file, this will trigger a different
1681 # error message than either of those above.
1682 d.addCallback(_reset)
1683 d.addCallback(lambda ign:
1684 self._setup_and_upload())
1685 d.addCallback(lambda ign:
1686 self._add_server_with_share(server_number=1, share_number=0,
1688 d.addCallback(lambda ign:
1689 self._add_server_with_share(server_number=2, share_number=0,
1691 d.addCallback(lambda ign:
1692 self._add_server_with_share(server_number=3, share_number=0,
1694 d.addCallback(_reset_encoding_parameters, happy=7)
1695 d.addCallback(lambda client:
1696 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1697 "shares could be placed or found on 4 server(s), "
1698 "but they are not spread out evenly enough to "
1699 "ensure that any 3 of these servers would have "
1700 "enough shares to recover the file. We were asked "
1701 "to place shares on at least 7 servers such that "
1702 "any 3 of them have enough shares to recover the "
1704 client.upload, upload.Data("data" * 10000,
1709 def test_problem_layout_comment_187(self):
1710 # #778 comment 187 broke an initial attempt at a share
1711 # redistribution algorithm. This test is here to demonstrate the
1712 # breakage, and to test that subsequent algorithms don't also
1713 # break in the same way.
1714 self.basedir = self.mktemp()
1715 d = self._setup_and_upload(k=2, n=3)
1717 # server 1: shares 0, 1, 2, readonly
1718 # server 2: share 0, readonly
1721 self._add_server_with_share(server_number=1, share_number=0,
1723 self._add_server_with_share(server_number=2, share_number=0,
1725 self._add_server_with_share(server_number=3, share_number=0)
1727 self._copy_share_to_server(1, 1)
1728 self._copy_share_to_server(2, 1)
1730 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1731 client = self.g.clients[0]
1732 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
1735 d.addCallback(_setup)
1736 d.addCallback(lambda client:
1737 client.upload(upload.Data("data" * 10000, convergence="")))
1739 test_problem_layout_comment_187.todo = "this isn't fixed yet"
1742 def test_upload_succeeds_with_some_homeless_shares(self):
1743 # If the upload is forced to stop trying to place shares before
1744 # it has placed (or otherwise accounted) for all of them, but it
1745 # has placed enough to satisfy the upload health criteria that
1746 # we're using, it should still succeed.
1747 self.basedir = self.mktemp()
1748 d = self._setup_and_upload()
1749 def _server_setup(ign):
1750 # Add four servers so that we have a layout like this:
1751 # server 1: share 0, read-only
1752 # server 2: share 1, read-only
1753 # server 3: share 2, read-only
1754 # server 4: share 3, read-only
1755 # If we set happy = 4, the upload will manage to satisfy
1756 # servers of happiness, but not place all of the shares; we
1757 # want to test that the upload is declared successful in
1759 self._add_server_with_share(server_number=1, share_number=0,
1761 self._add_server_with_share(server_number=2, share_number=1,
1763 self._add_server_with_share(server_number=3, share_number=2,
1765 self._add_server_with_share(server_number=4, share_number=3,
1768 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1769 # Set the client appropriately
1770 c = self.g.clients[0]
1771 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1773 d.addCallback(_server_setup)
1774 d.addCallback(lambda client:
1775 client.upload(upload.Data("data" * 10000, convergence="")))
1779 def test_uploader_skips_over_servers_with_only_one_share(self):
1780 # We want to make sure that the redistribution logic ignores
1781 # servers with only one share, since placing these shares
1782 # elsewhere will at best keep happiness the same as it was, and
1784 self.basedir = self.mktemp()
1785 d = self._setup_and_upload()
1786 def _server_setup(ign):
1787 # Add some servers so that the upload will need to
1788 # redistribute, but will first pass over a couple of servers
1789 # that don't have enough shares to redistribute before
1790 # finding one that does have shares to redistribute.
1791 self._add_server_with_share(server_number=1, share_number=0)
1792 self._add_server_with_share(server_number=2, share_number=2)
1793 self._add_server_with_share(server_number=3, share_number=1)
1794 self._add_server_with_share(server_number=8, share_number=4)
1795 self._add_server_with_share(server_number=5, share_number=5)
1796 self._add_server_with_share(server_number=10, share_number=7)
1798 self._copy_share_to_server(i, 2)
1799 return self.g.clients[0]
1800 d.addCallback(_server_setup)
1801 d.addCallback(lambda client:
1802 client.upload(upload.Data("data" * 10000, convergence="")))
1806 def _set_up_nodes_extra_config(self, clientdir):
1807 cfgfn = os.path.join(clientdir, "tahoe.cfg")
1808 oldcfg = open(cfgfn, "r").read()
1809 f = open(cfgfn, "wt")
1812 f.write("[client]\n")
1813 f.write("shares.needed = 7\n")
1814 f.write("shares.total = 12\n")
1820 # upload with exactly 75 peers (shares_of_happiness)
1821 # have a download fail
1822 # cancel a download (need to implement more cancel stuff)