3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.python.failure import Failure
6 from twisted.python import log
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
14 from allmydata.util.assertutil import precondition
15 from allmydata.util.deferredutil import DeferredListShouldSucceed
16 from no_network import GridTestMixin
17 from common_util import ShouldFailMixin
18 from allmydata.storage_client import StorageFarmBroker
19 from allmydata.storage.server import storage_index_to_dir
23 def extract_uri(results):
26 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
27 # have been due to an earlier test ERROR'ing out due to timeout, which seems
28 # to screw up subsequent tests.
31 class Uploadable(unittest.TestCase):
32 def shouldEqual(self, data, expected):
33 self.failUnless(isinstance(data, list))
35 self.failUnless(isinstance(e, str))
37 self.failUnlessEqual(s, expected)
39 def test_filehandle_random_key(self):
40 return self._test_filehandle(convergence=None)
42 def test_filehandle_convergent_encryption(self):
43 return self._test_filehandle(convergence="some convergence string")
45 def _test_filehandle(self, convergence):
47 u = upload.FileHandle(s, convergence=convergence)
49 d.addCallback(self.failUnlessEqual, 41)
50 d.addCallback(lambda res: u.read(1))
51 d.addCallback(self.shouldEqual, "a")
52 d.addCallback(lambda res: u.read(80))
53 d.addCallback(self.shouldEqual, "a"*40)
54 d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
55 d.addCallback(lambda res: s.close()) # that privilege is reserved for us
58 def test_filename(self):
59 basedir = "upload/Uploadable/test_filename"
61 fn = os.path.join(basedir, "file")
65 u = upload.FileName(fn, convergence=None)
67 d.addCallback(self.failUnlessEqual, 41)
68 d.addCallback(lambda res: u.read(1))
69 d.addCallback(self.shouldEqual, "a")
70 d.addCallback(lambda res: u.read(80))
71 d.addCallback(self.shouldEqual, "a"*40)
72 d.addCallback(lambda res: u.close())
77 u = upload.Data(s, convergence=None)
79 d.addCallback(self.failUnlessEqual, 41)
80 d.addCallback(lambda res: u.read(1))
81 d.addCallback(self.shouldEqual, "a")
82 d.addCallback(lambda res: u.read(80))
83 d.addCallback(self.shouldEqual, "a"*40)
84 d.addCallback(lambda res: u.close())
87 class ServerError(Exception):
91 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
95 "max_segment_size": max_segsize,
97 self.node.DEFAULT_ENCODING_PARAMETERS = p
99 class FakeStorageServer:
100 def __init__(self, mode):
104 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
105 { "maximum-immutable-share-size": 2**32 },
106 "application-version": str(allmydata.__full_version__),
109 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
110 { "maximum-immutable-share-size": 10 },
111 "application-version": str(allmydata.__full_version__),
115 def callRemote(self, methname, *args, **kwargs):
117 meth = getattr(self, methname)
118 return meth(*args, **kwargs)
120 d.addCallback(lambda res: _call())
123 def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
124 sharenums, share_size, canary):
125 #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
126 if self.mode == "first-fail":
127 if self.queries == 0:
129 if self.mode == "second-fail":
130 if self.queries == 1:
133 if self.mode == "full":
135 elif self.mode == "already got them":
136 return (set(sharenums), {},)
138 for shnum in sharenums:
139 self.allocated.append( (storage_index, shnum) )
141 dict([( shnum, FakeBucketWriter(share_size) )
142 for shnum in sharenums]),
145 class FakeBucketWriter:
146 # a diagnostic version of storageserver.BucketWriter
147 def __init__(self, size):
148 self.data = StringIO()
152 def callRemote(self, methname, *args, **kwargs):
154 meth = getattr(self, "remote_" + methname)
155 return meth(*args, **kwargs)
157 d.addCallback(lambda res: _call())
160 def remote_write(self, offset, data):
161 precondition(not self.closed)
162 precondition(offset >= 0)
163 precondition(offset+len(data) <= self._size,
164 "offset=%d + data=%d > size=%d" %
165 (offset, len(data), self._size))
166 self.data.seek(offset)
167 self.data.write(data)
169 def remote_close(self):
170 precondition(not self.closed)
173 def remote_abort(self):
174 log.err(RuntimeError("uh oh, I was asked to abort"))
177 DEFAULT_ENCODING_PARAMETERS = {"k":25,
180 "max_segment_size": 1*MiB,
182 def __init__(self, mode="good", num_servers=50):
183 self.num_servers = num_servers
184 if type(mode) is str:
185 mode = dict([i,mode] for i in range(num_servers))
186 peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
187 for fakeid in range(self.num_servers) ]
188 self.storage_broker = StorageFarmBroker(None, permute_peers=True)
189 for (serverid, server) in peers:
190 self.storage_broker.test_add_server(serverid, server)
191 self.last_peers = [p[1] for p in peers]
193 def log(self, *args, **kwargs):
195 def get_encoding_parameters(self):
196 return self.DEFAULT_ENCODING_PARAMETERS
197 def get_storage_broker(self):
198 return self.storage_broker
199 _secret_holder = client.SecretHolder("lease secret", "convergence secret")
201 class GotTooFarError(Exception):
204 class GiganticUploadable(upload.FileHandle):
205 def __init__(self, size):
209 def get_encryption_key(self):
210 return defer.succeed("\x00" * 16)
212 return defer.succeed(self._size)
213 def read(self, length):
214 left = self._size - self._fp
215 length = min(left, length)
217 if self._fp > 1000000:
218 # terminate the test early.
219 raise GotTooFarError("we shouldn't be allowed to get this far")
220 return defer.succeed(["\x00" * length])
225 Once upon a time, there was a beautiful princess named Buttercup. She lived
226 in a magical land where every file was stored securely among millions of
227 machines, and nobody ever worried about their data being lost ever again.
230 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
234 SIZE_LARGE = len(DATA)
236 def upload_data(uploader, data):
237 u = upload.Data(data, convergence=None)
238 return uploader.upload(u)
239 def upload_filename(uploader, filename):
240 u = upload.FileName(filename, convergence=None)
241 return uploader.upload(u)
242 def upload_filehandle(uploader, fh):
243 u = upload.FileHandle(fh, convergence=None)
244 return uploader.upload(u)
246 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
248 self.node = FakeClient(mode="good")
249 self.u = upload.Uploader()
250 self.u.running = True
251 self.u.parent = self.node
253 def _check_small(self, newuri, size):
254 u = uri.from_string(newuri)
255 self.failUnless(isinstance(u, uri.LiteralFileURI))
256 self.failUnlessEqual(len(u.data), size)
258 def _check_large(self, newuri, size):
259 u = uri.from_string(newuri)
260 self.failUnless(isinstance(u, uri.CHKFileURI))
261 self.failUnless(isinstance(u.get_storage_index(), str))
262 self.failUnlessEqual(len(u.get_storage_index()), 16)
263 self.failUnless(isinstance(u.key, str))
264 self.failUnlessEqual(len(u.key), 16)
265 self.failUnlessEqual(u.size, size)
267 def get_data(self, size):
270 def test_too_large(self):
271 # we've removed the 4GiB share size limit (see ticket #346 for
272 # details), but still have an 8-byte field, so the limit is now
273 # 2**64, so make sure we reject files larger than that.
274 k = 3; happy = 7; n = 10
275 self.set_encoding_parameters(k, happy, n)
277 data1 = GiganticUploadable(big)
278 d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
279 "This file is too large to be uploaded (data_size)",
280 self.u.upload, data1)
281 data2 = GiganticUploadable(big-3)
282 d.addCallback(lambda res:
283 self.shouldFail(FileTooLargeError,
284 "test_too_large-data2",
285 "This file is too large to be uploaded (offsets)",
286 self.u.upload, data2))
287 # I don't know where the actual limit is.. it depends upon how large
288 # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
291 def test_data_zero(self):
292 data = self.get_data(SIZE_ZERO)
293 d = upload_data(self.u, data)
294 d.addCallback(extract_uri)
295 d.addCallback(self._check_small, SIZE_ZERO)
298 def test_data_small(self):
299 data = self.get_data(SIZE_SMALL)
300 d = upload_data(self.u, data)
301 d.addCallback(extract_uri)
302 d.addCallback(self._check_small, SIZE_SMALL)
305 def test_data_large(self):
306 data = self.get_data(SIZE_LARGE)
307 d = upload_data(self.u, data)
308 d.addCallback(extract_uri)
309 d.addCallback(self._check_large, SIZE_LARGE)
312 def test_data_large_odd_segments(self):
313 data = self.get_data(SIZE_LARGE)
314 segsize = int(SIZE_LARGE / 2.5)
315 # we want 3 segments, since that's not a power of two
316 self.set_encoding_parameters(25, 75, 100, segsize)
317 d = upload_data(self.u, data)
318 d.addCallback(extract_uri)
319 d.addCallback(self._check_large, SIZE_LARGE)
322 def test_filehandle_zero(self):
323 data = self.get_data(SIZE_ZERO)
324 d = upload_filehandle(self.u, StringIO(data))
325 d.addCallback(extract_uri)
326 d.addCallback(self._check_small, SIZE_ZERO)
329 def test_filehandle_small(self):
330 data = self.get_data(SIZE_SMALL)
331 d = upload_filehandle(self.u, StringIO(data))
332 d.addCallback(extract_uri)
333 d.addCallback(self._check_small, SIZE_SMALL)
336 def test_filehandle_large(self):
337 data = self.get_data(SIZE_LARGE)
338 d = upload_filehandle(self.u, StringIO(data))
339 d.addCallback(extract_uri)
340 d.addCallback(self._check_large, SIZE_LARGE)
343 def test_filename_zero(self):
344 fn = "Uploader-test_filename_zero.data"
346 data = self.get_data(SIZE_ZERO)
349 d = upload_filename(self.u, fn)
350 d.addCallback(extract_uri)
351 d.addCallback(self._check_small, SIZE_ZERO)
354 def test_filename_small(self):
355 fn = "Uploader-test_filename_small.data"
357 data = self.get_data(SIZE_SMALL)
360 d = upload_filename(self.u, fn)
361 d.addCallback(extract_uri)
362 d.addCallback(self._check_small, SIZE_SMALL)
365 def test_filename_large(self):
366 fn = "Uploader-test_filename_large.data"
368 data = self.get_data(SIZE_LARGE)
371 d = upload_filename(self.u, fn)
372 d.addCallback(extract_uri)
373 d.addCallback(self._check_large, SIZE_LARGE)
376 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
377 def make_node(self, mode, num_servers=10):
378 self.node = FakeClient(mode, num_servers)
379 self.u = upload.Uploader()
380 self.u.running = True
381 self.u.parent = self.node
383 def _check_large(self, newuri, size):
384 u = uri.from_string(newuri)
385 self.failUnless(isinstance(u, uri.CHKFileURI))
386 self.failUnless(isinstance(u.get_storage_index(), str))
387 self.failUnlessEqual(len(u.get_storage_index()), 16)
388 self.failUnless(isinstance(u.key, str))
389 self.failUnlessEqual(len(u.key), 16)
390 self.failUnlessEqual(u.size, size)
392 def test_first_error(self):
393 mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
395 d = upload_data(self.u, DATA)
396 d.addCallback(extract_uri)
397 d.addCallback(self._check_large, SIZE_LARGE)
400 def test_first_error_all(self):
401 self.make_node("first-fail")
402 d = self.shouldFail(UploadUnhappinessError, "first_error_all",
403 "peer selection failed",
404 upload_data, self.u, DATA)
406 self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
407 # there should also be a 'last failure was' message
408 self.failUnlessIn("ServerError", str(f.value))
409 d.addCallback(_check)
412 def test_second_error(self):
413 # we want to make sure we make it to a third pass. This means that
414 # the first pass was insufficient to place all shares, and at least
415 # one of second pass servers (other than the last one) accepted a
416 # share (so we'll believe that a third pass will be useful). (if
417 # everyone but the last server throws an error, then we'll send all
418 # the remaining shares to the last server at the end of the second
419 # pass, and if that succeeds, we won't make it to a third pass).
421 # we can achieve this 97.5% of the time by using 40 servers, having
422 # 39 of them fail on the second request, leaving only one to succeed
423 # on the second request. (we need to keep the number of servers low
424 # enough to ensure a second pass with 100 shares).
425 mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
426 self.make_node(mode, 40)
427 d = upload_data(self.u, DATA)
428 d.addCallback(extract_uri)
429 d.addCallback(self._check_large, SIZE_LARGE)
432 def test_second_error_all(self):
433 self.make_node("second-fail")
434 d = self.shouldFail(UploadUnhappinessError, "second_error_all",
435 "peer selection failed",
436 upload_data, self.u, DATA)
438 self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
439 # there should also be a 'last failure was' message
440 self.failUnlessIn("ServerError", str(f.value))
441 d.addCallback(_check)
444 class FullServer(unittest.TestCase):
446 self.node = FakeClient(mode="full")
447 self.u = upload.Uploader()
448 self.u.running = True
449 self.u.parent = self.node
451 def _should_fail(self, f):
452 self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
454 def test_data_large(self):
456 d = upload_data(self.u, data)
457 d.addBoth(self._should_fail)
460 class PeerSelection(unittest.TestCase):
462 def make_client(self, num_servers=50):
463 self.node = FakeClient(mode="good", num_servers=num_servers)
464 self.u = upload.Uploader()
465 self.u.running = True
466 self.u.parent = self.node
468 def get_data(self, size):
471 def _check_large(self, newuri, size):
472 u = uri.from_string(newuri)
473 self.failUnless(isinstance(u, uri.CHKFileURI))
474 self.failUnless(isinstance(u.get_storage_index(), str))
475 self.failUnlessEqual(len(u.get_storage_index()), 16)
476 self.failUnless(isinstance(u.key, str))
477 self.failUnlessEqual(len(u.key), 16)
478 self.failUnlessEqual(u.size, size)
480 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
484 "max_segment_size": max_segsize,
486 self.node.DEFAULT_ENCODING_PARAMETERS = p
488 def test_one_each(self):
489 # if we have 50 shares, and there are 50 peers, and they all accept a
490 # share, we should get exactly one share per peer
493 data = self.get_data(SIZE_LARGE)
494 self.set_encoding_parameters(25, 30, 50)
495 d = upload_data(self.u, data)
496 d.addCallback(extract_uri)
497 d.addCallback(self._check_large, SIZE_LARGE)
499 for p in self.node.last_peers:
500 allocated = p.allocated
501 self.failUnlessEqual(len(allocated), 1)
502 self.failUnlessEqual(p.queries, 1)
503 d.addCallback(_check)
506 def test_two_each(self):
507 # if we have 100 shares, and there are 50 peers, and they all accept
508 # all shares, we should get exactly two shares per peer
511 data = self.get_data(SIZE_LARGE)
512 self.set_encoding_parameters(50, 75, 100)
513 d = upload_data(self.u, data)
514 d.addCallback(extract_uri)
515 d.addCallback(self._check_large, SIZE_LARGE)
517 for p in self.node.last_peers:
518 allocated = p.allocated
519 self.failUnlessEqual(len(allocated), 2)
520 self.failUnlessEqual(p.queries, 2)
521 d.addCallback(_check)
524 def test_one_each_plus_one_extra(self):
525 # if we have 51 shares, and there are 50 peers, then one peer gets
526 # two shares and the rest get just one
529 data = self.get_data(SIZE_LARGE)
530 self.set_encoding_parameters(24, 41, 51)
531 d = upload_data(self.u, data)
532 d.addCallback(extract_uri)
533 d.addCallback(self._check_large, SIZE_LARGE)
537 for p in self.node.last_peers:
538 allocated = p.allocated
539 self.failUnless(len(allocated) in (1,2), len(allocated))
540 if len(allocated) == 1:
541 self.failUnlessEqual(p.queries, 1)
544 self.failUnlessEqual(p.queries, 2)
546 self.failUnlessEqual(len(got_one), 49)
547 self.failUnlessEqual(len(got_two), 1)
548 d.addCallback(_check)
551 def test_four_each(self):
552 # if we have 200 shares, and there are 50 peers, then each peer gets
553 # 4 shares. The design goal is to accomplish this with only two
557 data = self.get_data(SIZE_LARGE)
558 self.set_encoding_parameters(100, 150, 200)
559 d = upload_data(self.u, data)
560 d.addCallback(extract_uri)
561 d.addCallback(self._check_large, SIZE_LARGE)
563 for p in self.node.last_peers:
564 allocated = p.allocated
565 self.failUnlessEqual(len(allocated), 4)
566 self.failUnlessEqual(p.queries, 2)
567 d.addCallback(_check)
570 def test_three_of_ten(self):
571 # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
575 data = self.get_data(SIZE_LARGE)
576 self.set_encoding_parameters(3, 5, 10)
577 d = upload_data(self.u, data)
578 d.addCallback(extract_uri)
579 d.addCallback(self._check_large, SIZE_LARGE)
582 for p in self.node.last_peers:
583 allocated = p.allocated
584 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
585 histogram = [counts.get(i, 0) for i in range(5)]
586 self.failUnlessEqual(histogram, [0,0,0,2,1])
587 d.addCallback(_check)
590 def test_some_big_some_small(self):
591 # 10 shares, 20 servers, but half the servers don't support a
592 # share-size large enough for our file
593 mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
594 self.node = FakeClient(mode, num_servers=20)
595 self.u = upload.Uploader()
596 self.u.running = True
597 self.u.parent = self.node
599 data = self.get_data(SIZE_LARGE)
600 self.set_encoding_parameters(3, 5, 10)
601 d = upload_data(self.u, data)
602 d.addCallback(extract_uri)
603 d.addCallback(self._check_large, SIZE_LARGE)
605 # we should have put one share each on the big peers, and zero
606 # shares on the small peers
608 for p in self.node.last_peers:
610 self.failUnlessEqual(len(p.allocated), 1)
611 elif p.mode == "small":
612 self.failUnlessEqual(len(p.allocated), 0)
613 total_allocated += len(p.allocated)
614 self.failUnlessEqual(total_allocated, 10)
615 d.addCallback(_check)
619 class StorageIndex(unittest.TestCase):
620 def test_params_must_matter(self):
621 DATA = "I am some data"
622 u = upload.Data(DATA, convergence="")
623 eu = upload.EncryptAnUploadable(u)
624 d1 = eu.get_storage_index()
626 # CHK means the same data should encrypt the same way
627 u = upload.Data(DATA, convergence="")
628 eu = upload.EncryptAnUploadable(u)
629 d1a = eu.get_storage_index()
631 # but if we use a different convergence string it should be different
632 u = upload.Data(DATA, convergence="wheee!")
633 eu = upload.EncryptAnUploadable(u)
634 d1salt1 = eu.get_storage_index()
636 # and if we add yet a different convergence it should be different again
637 u = upload.Data(DATA, convergence="NOT wheee!")
638 eu = upload.EncryptAnUploadable(u)
639 d1salt2 = eu.get_storage_index()
641 # and if we use the first string again it should be the same as last time
642 u = upload.Data(DATA, convergence="wheee!")
643 eu = upload.EncryptAnUploadable(u)
644 d1salt1a = eu.get_storage_index()
646 # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
647 u = upload.Data(DATA, convergence="")
648 u.encoding_param_k = u.default_encoding_param_k + 1
649 eu = upload.EncryptAnUploadable(u)
650 d2 = eu.get_storage_index()
652 # and if we use a random key, it should be different than the CHK
653 u = upload.Data(DATA, convergence=None)
654 eu = upload.EncryptAnUploadable(u)
655 d3 = eu.get_storage_index()
656 # and different from another instance
657 u = upload.Data(DATA, convergence=None)
658 eu = upload.EncryptAnUploadable(u)
659 d4 = eu.get_storage_index()
661 d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
663 si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
664 self.failUnlessEqual(si1, si1a)
665 self.failIfEqual(si1, si2)
666 self.failIfEqual(si1, si3)
667 self.failIfEqual(si1, si4)
668 self.failIfEqual(si3, si4)
669 self.failIfEqual(si1salt1, si1)
670 self.failIfEqual(si1salt1, si1salt2)
671 self.failIfEqual(si1salt2, si1)
672 self.failUnlessEqual(si1salt1, si1salt1a)
676 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
678 def _do_upload_with_broken_servers(self, servers_to_break):
680 I act like a normal upload, but before I send the results of
681 Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
682 PeerTrackers in the used_peers part of the return result.
684 assert self.g, "I tried to find a grid at self.g, but failed"
685 broker = self.g.clients[0].storage_broker
686 sh = self.g.clients[0]._secret_holder
687 data = upload.Data("data" * 10000, convergence="")
688 data.encoding_param_k = 3
689 data.encoding_param_happy = 4
690 data.encoding_param_n = 10
691 uploadable = upload.EncryptAnUploadable(data)
692 encoder = encode.Encoder()
693 encoder.set_encrypted_uploadable(uploadable)
694 status = upload.UploadStatus()
695 selector = upload.Tahoe2PeerSelector("dglev", "test", status)
696 storage_index = encoder.get_param("storage_index")
697 share_size = encoder.get_param("share_size")
698 block_size = encoder.get_param("block_size")
699 num_segments = encoder.get_param("num_segments")
700 d = selector.get_shareholders(broker, sh, storage_index,
701 share_size, block_size, num_segments,
703 def _have_shareholders((used_peers, already_peers)):
704 assert servers_to_break <= len(used_peers)
705 for index in xrange(servers_to_break):
706 server = list(used_peers)[index]
707 for share in server.buckets.keys():
708 server.buckets[share].abort()
710 servermap = already_peers.copy()
711 for peer in used_peers:
712 buckets.update(peer.buckets)
713 for bucket in peer.buckets:
714 servermap[bucket] = peer.peerid
715 encoder.set_shareholders(buckets, servermap)
718 d.addCallback(_have_shareholders)
722 def _add_server(self, server_number, readonly=False):
723 assert self.g, "I tried to find a grid at self.g, but failed"
724 assert self.shares, "I tried to find shares at self.shares, but failed"
725 ss = self.g.make_server(server_number, readonly)
726 self.g.add_server(server_number, ss)
729 def _add_server_with_share(self, server_number, share_number=None,
731 self._add_server(server_number, readonly)
732 if share_number is not None:
733 self._copy_share_to_server(share_number, server_number)
736 def _copy_share_to_server(self, share_number, server_number):
737 ss = self.g.servers_by_number[server_number]
738 # Copy share i from the directory associated with the first
739 # storage server to the directory associated with this one.
740 assert self.g, "I tried to find a grid at self.g, but failed"
741 assert self.shares, "I tried to find shares at self.shares, but failed"
742 old_share_location = self.shares[share_number][2]
743 new_share_location = os.path.join(ss.storedir, "shares")
744 si = uri.from_string(self.uri).get_storage_index()
745 new_share_location = os.path.join(new_share_location,
746 storage_index_to_dir(si))
747 if not os.path.exists(new_share_location):
748 os.makedirs(new_share_location)
749 new_share_location = os.path.join(new_share_location,
751 if old_share_location != new_share_location:
752 shutil.copy(old_share_location, new_share_location)
753 shares = self.find_shares(self.uri)
754 # Make sure that the storage server has the share.
755 self.failUnless((share_number, ss.my_nodeid, new_share_location)
759 def _setup_and_upload(self):
761 I set up a NoNetworkGrid with a single server and client,
762 upload a file to it, store its uri in self.uri, and store its
763 sharedata in self.shares.
765 self.set_up_grid(num_clients=1, num_servers=1)
766 client = self.g.clients[0]
767 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
768 data = upload.Data("data" * 10000, convergence="")
770 d = client.upload(data)
773 d.addCallback(_store_uri)
774 d.addCallback(lambda ign:
775 self.find_shares(self.uri))
776 def _store_shares(shares):
778 d.addCallback(_store_shares)
782 def test_configure_parameters(self):
783 self.basedir = self.mktemp()
784 hooks = {0: self._set_up_nodes_extra_config}
785 self.set_up_grid(client_config_hooks=hooks)
786 c0 = self.g.clients[0]
789 u = upload.Data(DATA, convergence="")
791 d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
792 m = monitor.Monitor()
793 d.addCallback(lambda fn: fn.check(m))
796 self.failUnlessEqual(data["count-shares-needed"], 7)
797 self.failUnlessEqual(data["count-shares-expected"], 12)
798 d.addCallback(_check)
802 def _setUp(self, ns):
803 # Used by test_happy_semantics and test_prexisting_share_behavior
804 # to set up the grid.
805 self.node = FakeClient(mode="good", num_servers=ns)
806 self.u = upload.Uploader()
807 self.u.running = True
808 self.u.parent = self.node
811 def test_happy_semantics(self):
813 DATA = upload.Data("kittens" * 10000, convergence="")
814 # These parameters are unsatisfiable with the client that we've made
815 # -- we'll use them to test that the semnatics work correctly.
816 self.set_encoding_parameters(k=3, happy=5, n=10)
817 d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
818 "shares could only be placed on 2 servers "
819 "(5 were requested)",
821 # Let's reset the client to have 10 servers
822 d.addCallback(lambda ign:
824 # These parameters are satisfiable with the client we've made.
825 d.addCallback(lambda ign:
826 self.set_encoding_parameters(k=3, happy=5, n=10))
828 d.addCallback(lambda ign:
830 # Let's reset the client to have 7 servers
831 # (this is less than n, but more than h)
832 d.addCallback(lambda ign:
834 # These encoding parameters should still be satisfiable with our
836 d.addCallback(lambda ign:
837 self.set_encoding_parameters(k=3, happy=5, n=10))
838 # This, then, should work.
839 d.addCallback(lambda ign:
844 def test_problem_layout_comment_52(self):
846 self.basedir = self.mktemp()
848 # This scenario is at
849 # http://allmydata.org/trac/tahoe/ticket/778#comment:52
851 # The scenario in comment:52 proposes that we have a layout
856 # server 4: shares 2 - 10
857 # To get access to the shares, we will first upload to one
858 # server, which will then have shares 1 - 10. We'll then
859 # add three new servers, configure them to not accept any new
860 # shares, then write share 1 directly into the serverdir of each.
861 # Then each of servers 1 - 3 will report that they have share 1,
862 # and will not accept any new share, while server 4 will report that
863 # it has shares 2 - 10 and will accept new shares.
864 # We'll then set 'happy' = 4, and see that an upload fails
866 d = self._setup_and_upload()
867 d.addCallback(lambda ign:
868 self._add_server_with_share(1, 0, True))
869 d.addCallback(lambda ign:
870 self._add_server_with_share(2, 0, True))
871 d.addCallback(lambda ign:
872 self._add_server_with_share(3, 0, True))
873 # Remove the first share from server 0.
874 def _remove_share_0():
875 share_location = self.shares[0][2]
876 os.remove(share_location)
877 d.addCallback(lambda ign:
879 # Set happy = 4 in the client.
881 client = self.g.clients[0]
882 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
884 d.addCallback(lambda ign:
886 # Uploading data should fail
887 d.addCallback(lambda client:
888 self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
889 "shares could only be placed on 2 servers "
890 "(4 were requested)",
891 client.upload, upload.Data("data" * 10000,
894 # Do comment:52, but like this:
896 # server 3: share 0, read-only
897 # server 1: share 0, read-only
898 # server 0: shares 0-9
899 d.addCallback(lambda ign:
901 d.addCallback(lambda ign:
902 self._setup_and_upload())
903 d.addCallback(lambda ign:
904 self._add_server_with_share(server_number=2))
905 d.addCallback(lambda ign:
906 self._add_server_with_share(server_number=3, share_number=0,
908 d.addCallback(lambda ign:
909 self._add_server_with_share(server_number=1, share_number=0,
912 client = self.g.clients[0]
913 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
915 d.addCallback(lambda ign:
917 d.addCallback(lambda client:
918 self.shouldFail(UploadUnhappinessError, "test_happy_sematics",
919 "shares could only be placed on 2 servers "
920 "(3 were requested)",
921 client.upload, upload.Data("data" * 10000,
926 def test_problem_layout_comment_53(self):
927 # This scenario is at
928 # http://allmydata.org/trac/tahoe/ticket/778#comment:53
930 # Set up the grid to have one server
931 def _change_basedir(ign):
932 self.basedir = self.mktemp()
933 _change_basedir(None)
934 d = self._setup_and_upload()
935 # We start by uploading all of the shares to one server (which has
936 # already been done above).
937 # Next, we'll add three new servers to our NoNetworkGrid. We'll add
938 # one share from our initial upload to each of these.
939 # The counterintuitive ordering of the share numbers is to deal with
940 # the permuting of these servers -- distributing the shares this
941 # way ensures that the Tahoe2PeerSelector sees them in the order
943 d.addCallback(lambda ign:
944 self._add_server_with_share(server_number=1, share_number=2))
945 d.addCallback(lambda ign:
946 self._add_server_with_share(server_number=2, share_number=0))
947 d.addCallback(lambda ign:
948 self._add_server_with_share(server_number=3, share_number=1))
949 # So, we now have the following layout:
950 # server 0: shares 0 - 9
954 # We want to change the 'happy' parameter in the client to 4.
955 # The Tahoe2PeerSelector will see the peers permuted as:
957 # Ideally, a reupload of our original data should work.
958 def _reset_encoding_parameters(ign, happy=4):
959 client = self.g.clients[0]
960 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
962 d.addCallback(_reset_encoding_parameters)
963 d.addCallback(lambda client:
964 client.upload(upload.Data("data" * 10000, convergence="")))
967 # This scenario is basically comment:53, but with the order reversed;
968 # this means that the Tahoe2PeerSelector sees
969 # server 2: shares 1-10
973 d.addCallback(_change_basedir)
974 d.addCallback(lambda ign:
975 self._setup_and_upload())
976 d.addCallback(lambda ign:
977 self._add_server_with_share(server_number=2, share_number=0))
978 d.addCallback(lambda ign:
979 self._add_server_with_share(server_number=3, share_number=1))
980 d.addCallback(lambda ign:
981 self._add_server_with_share(server_number=1, share_number=2))
982 # Copy all of the other shares to server number 2
983 def _copy_shares(ign):
984 for i in xrange(1, 10):
985 self._copy_share_to_server(i, 2)
986 d.addCallback(_copy_shares)
987 # Remove the first server, and add a placeholder with share 0
988 d.addCallback(lambda ign:
989 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
990 d.addCallback(lambda ign:
991 self._add_server_with_share(server_number=4, share_number=0))
993 d.addCallback(_reset_encoding_parameters)
994 d.addCallback(lambda client:
995 client.upload(upload.Data("data" * 10000, convergence="")))
996 # Try the same thing, but with empty servers after the first one
997 # We want to make sure that Tahoe2PeerSelector will redistribute
998 # shares as necessary, not simply discover an existing layout.
999 d.addCallback(_change_basedir)
1000 d.addCallback(lambda ign:
1001 self._setup_and_upload())
1002 d.addCallback(lambda ign:
1003 self._add_server(server_number=2))
1004 d.addCallback(lambda ign:
1005 self._add_server(server_number=3))
1006 d.addCallback(lambda ign:
1007 self._add_server(server_number=1))
1008 d.addCallback(_copy_shares)
1009 d.addCallback(lambda ign:
1010 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1011 d.addCallback(lambda ign:
1012 self._add_server(server_number=4))
1013 d.addCallback(_reset_encoding_parameters)
1014 d.addCallback(lambda client:
1015 client.upload(upload.Data("data" * 10000, convergence="")))
1019 def test_happiness_with_some_readonly_peers(self):
1020 # Try the following layout
1021 # server 2: shares 0-9
1022 # server 4: share 0, read-only
1023 # server 3: share 1, read-only
1024 # server 1: share 2, read-only
1025 self.basedir = self.mktemp()
1026 d = self._setup_and_upload()
1027 d.addCallback(lambda ign:
1028 self._add_server_with_share(server_number=2, share_number=0))
1029 d.addCallback(lambda ign:
1030 self._add_server_with_share(server_number=3, share_number=1,
1032 d.addCallback(lambda ign:
1033 self._add_server_with_share(server_number=1, share_number=2,
1035 # Copy all of the other shares to server number 2
1036 def _copy_shares(ign):
1037 for i in xrange(1, 10):
1038 self._copy_share_to_server(i, 2)
1039 d.addCallback(_copy_shares)
1040 # Remove server 0, and add another in its place
1041 d.addCallback(lambda ign:
1042 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1043 d.addCallback(lambda ign:
1044 self._add_server_with_share(server_number=4, share_number=0,
1046 def _reset_encoding_parameters(ign, happy=4):
1047 client = self.g.clients[0]
1048 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1050 d.addCallback(_reset_encoding_parameters)
1051 d.addCallback(lambda client:
1052 client.upload(upload.Data("data" * 10000, convergence="")))
1056 def test_happiness_with_all_readonly_peers(self):
1057 # server 3: share 1, read-only
1058 # server 1: share 2, read-only
1059 # server 2: shares 0-9, read-only
1060 # server 4: share 0, read-only
1061 # The idea with this test is to make sure that the survey of
1062 # read-only peers doesn't undercount servers of happiness
1063 self.basedir = self.mktemp()
1064 d = self._setup_and_upload()
1065 d.addCallback(lambda ign:
1066 self._add_server_with_share(server_number=4, share_number=0,
1068 d.addCallback(lambda ign:
1069 self._add_server_with_share(server_number=3, share_number=1,
1071 d.addCallback(lambda ign:
1072 self._add_server_with_share(server_number=1, share_number=2,
1074 d.addCallback(lambda ign:
1075 self._add_server_with_share(server_number=2, share_number=0,
1077 def _copy_shares(ign):
1078 for i in xrange(1, 10):
1079 self._copy_share_to_server(i, 2)
1080 d.addCallback(_copy_shares)
1081 d.addCallback(lambda ign:
1082 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1083 def _reset_encoding_parameters(ign, happy=4):
1084 client = self.g.clients[0]
1085 client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1087 d.addCallback(_reset_encoding_parameters)
1088 d.addCallback(lambda client:
1089 client.upload(upload.Data("data" * 10000, convergence="")))
1093 def test_dropped_servers_in_encoder(self):
1094 def _set_basedir(ign=None):
1095 self.basedir = self.mktemp()
1097 d = self._setup_and_upload();
1099 def _do_server_setup(ign):
1100 self._add_server_with_share(1)
1101 self._add_server_with_share(2)
1102 self._add_server_with_share(3)
1103 self._add_server_with_share(4)
1104 self._add_server_with_share(5)
1105 d.addCallback(_do_server_setup)
1106 # remove the original server
1107 # (necessary to ensure that the Tahoe2PeerSelector will distribute
1109 def _remove_server(ign):
1110 server = self.g.servers_by_number[0]
1111 self.g.remove_server(server.my_nodeid)
1112 d.addCallback(_remove_server)
1113 # This should succeed.
1114 d.addCallback(lambda ign:
1115 self._do_upload_with_broken_servers(1))
1116 # Now, do the same thing over again, but drop 2 servers instead
1117 # of 1. This should fail.
1118 d.addCallback(_set_basedir)
1119 d.addCallback(lambda ign:
1120 self._setup_and_upload())
1121 d.addCallback(_do_server_setup)
1122 d.addCallback(_remove_server)
1123 d.addCallback(lambda ign:
1124 self.shouldFail(UploadUnhappinessError,
1125 "test_dropped_servers_in_encoder",
1126 "lost too many servers during upload "
1127 "(still have 3, want 4)",
1128 self._do_upload_with_broken_servers, 2))
1129 # Now do the same thing over again, but make some of the servers
1130 # readonly, break some of the ones that aren't, and make sure that
1131 # happiness accounting is preserved.
1132 d.addCallback(_set_basedir)
1133 d.addCallback(lambda ign:
1134 self._setup_and_upload())
1135 def _do_server_setup_2(ign):
1136 self._add_server_with_share(1)
1137 self._add_server_with_share(2)
1138 self._add_server_with_share(3)
1139 self._add_server_with_share(4, 7, readonly=True)
1140 self._add_server_with_share(5, 8, readonly=True)
1141 d.addCallback(_do_server_setup_2)
1142 d.addCallback(_remove_server)
1143 d.addCallback(lambda ign:
1144 self._do_upload_with_broken_servers(1))
1145 d.addCallback(_set_basedir)
1146 d.addCallback(lambda ign:
1147 self._setup_and_upload())
1148 d.addCallback(_do_server_setup_2)
1149 d.addCallback(_remove_server)
1150 d.addCallback(lambda ign:
1151 self.shouldFail(UploadUnhappinessError,
1152 "test_dropped_servers_in_encoder",
1153 "lost too many servers during upload "
1154 "(still have 3, want 4)",
1155 self._do_upload_with_broken_servers, 2))
1159 def test_servers_with_unique_shares(self):
1160 # servers_with_unique_shares expects a dict of
1161 # shnum => peerid as a preexisting shares argument.
1168 unique_servers = upload.servers_with_unique_shares(test1)
1169 self.failUnlessEqual(4, len(unique_servers))
1170 for server in ["server1", "server2", "server3", "server4"]:
1171 self.failUnlessIn(server, unique_servers)
1172 test1[4] = "server1"
1173 # Now there should only be 3 unique servers.
1174 unique_servers = upload.servers_with_unique_shares(test1)
1175 self.failUnlessEqual(3, len(unique_servers))
1176 for server in ["server1", "server2", "server3"]:
1177 self.failUnlessIn(server, unique_servers)
1178 # servers_with_unique_shares expects to receive some object with
1179 # a peerid attribute. So we make a FakePeerTracker whose only
1180 # job is to have a peerid attribute.
1181 class FakePeerTracker:
1184 for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1185 t = FakePeerTracker()
1189 # Recall that there are 3 unique servers in test1. Since none of
1190 # those overlap with the ones in trackers, we should get 7 back
1191 unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
1192 self.failUnlessEqual(7, len(unique_servers))
1193 expected_servers = ["server" + str(i) for i in xrange(1, 9)]
1194 expected_servers.remove("server4")
1195 for server in expected_servers:
1196 self.failUnlessIn(server, unique_servers)
1197 # Now add an overlapping server to trackers.
1198 t = FakePeerTracker()
1199 t.peerid = "server1"
1202 unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
1203 self.failUnlessEqual(7, len(unique_servers))
1204 for server in expected_servers:
1205 self.failUnlessIn(server, unique_servers)
1207 unique_servers = upload.servers_with_unique_shares(test)
1208 self.failUnlessEqual(0, len(test))
1211 def test_shares_by_server(self):
1212 test = dict([(i, "server%d" % i) for i in xrange(1, 5)])
1213 shares_by_server = upload.shares_by_server(test)
1214 self.failUnlessEqual(set([1]), shares_by_server["server1"])
1215 self.failUnlessEqual(set([2]), shares_by_server["server2"])
1216 self.failUnlessEqual(set([3]), shares_by_server["server3"])
1217 self.failUnlessEqual(set([4]), shares_by_server["server4"])
1225 shares_by_server = upload.shares_by_server(test1)
1226 self.failUnlessEqual(set([1, 2, 3]), shares_by_server["server1"])
1227 self.failUnlessEqual(set([4, 5]), shares_by_server["server2"])
1230 def test_existing_share_detection(self):
1231 self.basedir = self.mktemp()
1232 d = self._setup_and_upload()
1233 # Our final setup should look like this:
1234 # server 1: shares 1 - 10, read-only
1238 # The purpose of this test is to make sure that the peer selector
1239 # knows about the shares on server 1, even though it is read-only.
1240 # It used to simply filter these out, which would cause the test
1241 # to fail when servers_of_happiness = 4.
1242 d.addCallback(lambda ign:
1243 self._add_server_with_share(1, 0, True))
1244 d.addCallback(lambda ign:
1245 self._add_server_with_share(2))
1246 d.addCallback(lambda ign:
1247 self._add_server_with_share(3))
1248 d.addCallback(lambda ign:
1249 self._add_server_with_share(4))
1250 def _copy_shares(ign):
1251 for i in xrange(1, 10):
1252 self._copy_share_to_server(i, 1)
1253 d.addCallback(_copy_shares)
1254 d.addCallback(lambda ign:
1255 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1256 def _prepare_client(ign):
1257 client = self.g.clients[0]
1258 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1260 d.addCallback(_prepare_client)
1261 d.addCallback(lambda client:
1262 client.upload(upload.Data("data" * 10000, convergence="")))
1266 def test_should_add_server(self):
1267 shares = dict([(i, "server%d" % i) for i in xrange(10)])
1268 self.failIf(upload.should_add_server(shares, "server1", 4))
1269 shares[4] = "server1"
1270 self.failUnless(upload.should_add_server(shares, "server4", 4))
1272 self.failUnless(upload.should_add_server(shares, "server1", 1))
1275 def test_exception_messages_during_peer_selection(self):
1276 # server 1: readonly, no shares
1277 # server 2: readonly, no shares
1278 # server 3: readonly, no shares
1279 # server 4: readonly, no shares
1280 # server 5: readonly, no shares
1281 # This will fail, but we want to make sure that the log messages
1282 # are informative about why it has failed.
1283 self.basedir = self.mktemp()
1284 d = self._setup_and_upload()
1285 d.addCallback(lambda ign:
1286 self._add_server_with_share(server_number=1, readonly=True))
1287 d.addCallback(lambda ign:
1288 self._add_server_with_share(server_number=2, readonly=True))
1289 d.addCallback(lambda ign:
1290 self._add_server_with_share(server_number=3, readonly=True))
1291 d.addCallback(lambda ign:
1292 self._add_server_with_share(server_number=4, readonly=True))
1293 d.addCallback(lambda ign:
1294 self._add_server_with_share(server_number=5, readonly=True))
1295 d.addCallback(lambda ign:
1296 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1297 def _reset_encoding_parameters(ign):
1298 client = self.g.clients[0]
1299 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1301 d.addCallback(_reset_encoding_parameters)
1302 d.addCallback(lambda client:
1303 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1304 "peer selection failed for <Tahoe2PeerSelector "
1305 "for upload dglev>: placed 0 shares out of 10 "
1306 "total (10 homeless), want to place on 4 servers,"
1307 " sent 5 queries to 5 peers, 0 queries placed "
1308 "some shares, 5 placed none "
1309 "(of which 5 placed none due to the server being "
1310 "full and 0 placed none due to an error)",
1312 upload.Data("data" * 10000, convergence="")))
1315 # server 1: readonly, no shares
1316 # server 2: broken, no shares
1317 # server 3: readonly, no shares
1318 # server 4: readonly, no shares
1319 # server 5: readonly, no shares
1321 self.basedir = self.mktemp()
1322 d.addCallback(_reset)
1323 d.addCallback(lambda ign:
1324 self._setup_and_upload())
1325 d.addCallback(lambda ign:
1326 self._add_server_with_share(server_number=1, readonly=True))
1327 d.addCallback(lambda ign:
1328 self._add_server_with_share(server_number=2))
1329 def _break_server_2(ign):
1330 server = self.g.servers_by_number[2].my_nodeid
1331 # We have to break the server in servers_by_id,
1332 # because the ones in servers_by_number isn't wrapped,
1333 # and doesn't look at its broken attribute
1334 self.g.servers_by_id[server].broken = True
1335 d.addCallback(_break_server_2)
1336 d.addCallback(lambda ign:
1337 self._add_server_with_share(server_number=3, readonly=True))
1338 d.addCallback(lambda ign:
1339 self._add_server_with_share(server_number=4, readonly=True))
1340 d.addCallback(lambda ign:
1341 self._add_server_with_share(server_number=5, readonly=True))
1342 d.addCallback(lambda ign:
1343 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1344 def _reset_encoding_parameters(ign):
1345 client = self.g.clients[0]
1346 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1348 d.addCallback(_reset_encoding_parameters)
1349 d.addCallback(lambda client:
1350 self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1351 "peer selection failed for <Tahoe2PeerSelector "
1352 "for upload dglev>: placed 0 shares out of 10 "
1353 "total (10 homeless), want to place on 4 servers,"
1354 " sent 5 queries to 5 peers, 0 queries placed "
1355 "some shares, 5 placed none "
1356 "(of which 4 placed none due to the server being "
1357 "full and 1 placed none due to an error)",
1359 upload.Data("data" * 10000, convergence="")))
1363 def _set_up_nodes_extra_config(self, clientdir):
1364 cfgfn = os.path.join(clientdir, "tahoe.cfg")
1365 oldcfg = open(cfgfn, "r").read()
1366 f = open(cfgfn, "wt")
1369 f.write("[client]\n")
1370 f.write("shares.needed = 7\n")
1371 f.write("shares.total = 12\n")
1377 # upload with exactly 75 peers (shares_of_happiness)
1378 # have a download fail
1379 # cancel a download (need to implement more cancel stuff)