3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.python.failure import Failure
6 from twisted.python import log
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, NoSharesError, \
15 from allmydata.util.assertutil import precondition
16 from allmydata.util.deferredutil import DeferredListShouldSucceed
17 from no_network import GridTestMixin
18 from common_util import ShouldFailMixin
19 from allmydata.storage_client import StorageFarmBroker
20 from allmydata.storage.server import storage_index_to_dir
24 def extract_uri(results):
27 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
28 # have been due to an earlier test ERROR'ing out due to timeout, which seems
29 # to screw up subsequent tests.
32 class Uploadable(unittest.TestCase):
33 def shouldEqual(self, data, expected):
34 self.failUnless(isinstance(data, list))
36 self.failUnless(isinstance(e, str))
38 self.failUnlessEqual(s, expected)
40 def test_filehandle_random_key(self):
41 return self._test_filehandle(convergence=None)
43 def test_filehandle_convergent_encryption(self):
44 return self._test_filehandle(convergence="some convergence string")
46 def _test_filehandle(self, convergence):
48 u = upload.FileHandle(s, convergence=convergence)
50 d.addCallback(self.failUnlessEqual, 41)
51 d.addCallback(lambda res: u.read(1))
52 d.addCallback(self.shouldEqual, "a")
53 d.addCallback(lambda res: u.read(80))
54 d.addCallback(self.shouldEqual, "a"*40)
55 d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
56 d.addCallback(lambda res: s.close()) # that privilege is reserved for us
59 def test_filename(self):
60 basedir = "upload/Uploadable/test_filename"
62 fn = os.path.join(basedir, "file")
66 u = upload.FileName(fn, convergence=None)
68 d.addCallback(self.failUnlessEqual, 41)
69 d.addCallback(lambda res: u.read(1))
70 d.addCallback(self.shouldEqual, "a")
71 d.addCallback(lambda res: u.read(80))
72 d.addCallback(self.shouldEqual, "a"*40)
73 d.addCallback(lambda res: u.close())
78 u = upload.Data(s, convergence=None)
80 d.addCallback(self.failUnlessEqual, 41)
81 d.addCallback(lambda res: u.read(1))
82 d.addCallback(self.shouldEqual, "a")
83 d.addCallback(lambda res: u.read(80))
84 d.addCallback(self.shouldEqual, "a"*40)
85 d.addCallback(lambda res: u.close())
88 class ServerError(Exception):
92 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
96 "max_segment_size": max_segsize,
98 self.node.DEFAULT_ENCODING_PARAMETERS = p
100 class FakeStorageServer:
101 def __init__(self, mode):
105 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
106 { "maximum-immutable-share-size": 2**32 },
107 "application-version": str(allmydata.__full_version__),
110 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
111 { "maximum-immutable-share-size": 10 },
112 "application-version": str(allmydata.__full_version__),
116 def callRemote(self, methname, *args, **kwargs):
118 meth = getattr(self, methname)
119 return meth(*args, **kwargs)
121 d.addCallback(lambda res: _call())
124 def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
125 sharenums, share_size, canary):
126 #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
127 if self.mode == "first-fail":
128 if self.queries == 0:
130 if self.mode == "second-fail":
131 if self.queries == 1:
134 if self.mode == "full":
136 elif self.mode == "already got them":
137 return (set(sharenums), {},)
139 for shnum in sharenums:
140 self.allocated.append( (storage_index, shnum) )
142 dict([( shnum, FakeBucketWriter(share_size) )
143 for shnum in sharenums]),
146 class FakeBucketWriter:
147 # a diagnostic version of storageserver.BucketWriter
148 def __init__(self, size):
149 self.data = StringIO()
153 def callRemote(self, methname, *args, **kwargs):
155 meth = getattr(self, "remote_" + methname)
156 return meth(*args, **kwargs)
158 d.addCallback(lambda res: _call())
161 def remote_write(self, offset, data):
162 precondition(not self.closed)
163 precondition(offset >= 0)
164 precondition(offset+len(data) <= self._size,
165 "offset=%d + data=%d > size=%d" %
166 (offset, len(data), self._size))
167 self.data.seek(offset)
168 self.data.write(data)
170 def remote_close(self):
171 precondition(not self.closed)
174 def remote_abort(self):
175 log.err(RuntimeError("uh oh, I was asked to abort"))
178 DEFAULT_ENCODING_PARAMETERS = {"k":25,
181 "max_segment_size": 1*MiB,
183 def __init__(self, mode="good", num_servers=50):
184 self.num_servers = num_servers
185 if type(mode) is str:
186 mode = dict([i,mode] for i in range(num_servers))
187 peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
188 for fakeid in range(self.num_servers) ]
189 self.storage_broker = StorageFarmBroker(None, permute_peers=True)
190 for (serverid, server) in peers:
191 self.storage_broker.test_add_server(serverid, server)
192 self.last_peers = [p[1] for p in peers]
194 def log(self, *args, **kwargs):
196 def get_encoding_parameters(self):
197 return self.DEFAULT_ENCODING_PARAMETERS
198 def get_storage_broker(self):
199 return self.storage_broker
200 _secret_holder = client.SecretHolder("lease secret", "convergence secret")
202 class GotTooFarError(Exception):
205 class GiganticUploadable(upload.FileHandle):
206 def __init__(self, size):
210 def get_encryption_key(self):
211 return defer.succeed("\x00" * 16)
213 return defer.succeed(self._size)
214 def read(self, length):
215 left = self._size - self._fp
216 length = min(left, length)
218 if self._fp > 1000000:
219 # terminate the test early.
220 raise GotTooFarError("we shouldn't be allowed to get this far")
221 return defer.succeed(["\x00" * length])
226 Once upon a time, there was a beautiful princess named Buttercup. She lived
227 in a magical land where every file was stored securely among millions of
228 machines, and nobody ever worried about their data being lost ever again.
231 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
235 SIZE_LARGE = len(DATA)
237 def upload_data(uploader, data):
238 u = upload.Data(data, convergence=None)
239 return uploader.upload(u)
240 def upload_filename(uploader, filename):
241 u = upload.FileName(filename, convergence=None)
242 return uploader.upload(u)
243 def upload_filehandle(uploader, fh):
244 u = upload.FileHandle(fh, convergence=None)
245 return uploader.upload(u)
247 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
249 self.node = FakeClient(mode="good")
250 self.u = upload.Uploader()
251 self.u.running = True
252 self.u.parent = self.node
254 def _check_small(self, newuri, size):
255 u = uri.from_string(newuri)
256 self.failUnless(isinstance(u, uri.LiteralFileURI))
257 self.failUnlessEqual(len(u.data), size)
259 def _check_large(self, newuri, size):
260 u = uri.from_string(newuri)
261 self.failUnless(isinstance(u, uri.CHKFileURI))
262 self.failUnless(isinstance(u.get_storage_index(), str))
263 self.failUnlessEqual(len(u.get_storage_index()), 16)
264 self.failUnless(isinstance(u.key, str))
265 self.failUnlessEqual(len(u.key), 16)
266 self.failUnlessEqual(u.size, size)
268 def get_data(self, size):
271 def test_too_large(self):
272 # we've removed the 4GiB share size limit (see ticket #346 for
273 # details), but still have an 8-byte field, so the limit is now
274 # 2**64, so make sure we reject files larger than that.
275 k = 3; happy = 7; n = 10
276 self.set_encoding_parameters(k, happy, n)
278 data1 = GiganticUploadable(big)
279 d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
280 "This file is too large to be uploaded (data_size)",
281 self.u.upload, data1)
282 data2 = GiganticUploadable(big-3)
283 d.addCallback(lambda res:
284 self.shouldFail(FileTooLargeError,
285 "test_too_large-data2",
286 "This file is too large to be uploaded (offsets)",
287 self.u.upload, data2))
288 # I don't know where the actual limit is.. it depends upon how large
289 # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
292 def test_data_zero(self):
293 data = self.get_data(SIZE_ZERO)
294 d = upload_data(self.u, data)
295 d.addCallback(extract_uri)
296 d.addCallback(self._check_small, SIZE_ZERO)
299 def test_data_small(self):
300 data = self.get_data(SIZE_SMALL)
301 d = upload_data(self.u, data)
302 d.addCallback(extract_uri)
303 d.addCallback(self._check_small, SIZE_SMALL)
306 def test_data_large(self):
307 data = self.get_data(SIZE_LARGE)
308 d = upload_data(self.u, data)
309 d.addCallback(extract_uri)
310 d.addCallback(self._check_large, SIZE_LARGE)
313 def test_data_large_odd_segments(self):
314 data = self.get_data(SIZE_LARGE)
315 segsize = int(SIZE_LARGE / 2.5)
316 # we want 3 segments, since that's not a power of two
317 self.set_encoding_parameters(25, 75, 100, segsize)
318 d = upload_data(self.u, data)
319 d.addCallback(extract_uri)
320 d.addCallback(self._check_large, SIZE_LARGE)
323 def test_filehandle_zero(self):
324 data = self.get_data(SIZE_ZERO)
325 d = upload_filehandle(self.u, StringIO(data))
326 d.addCallback(extract_uri)
327 d.addCallback(self._check_small, SIZE_ZERO)
330 def test_filehandle_small(self):
331 data = self.get_data(SIZE_SMALL)
332 d = upload_filehandle(self.u, StringIO(data))
333 d.addCallback(extract_uri)
334 d.addCallback(self._check_small, SIZE_SMALL)
337 def test_filehandle_large(self):
338 data = self.get_data(SIZE_LARGE)
339 d = upload_filehandle(self.u, StringIO(data))
340 d.addCallback(extract_uri)
341 d.addCallback(self._check_large, SIZE_LARGE)
344 def test_filename_zero(self):
345 fn = "Uploader-test_filename_zero.data"
347 data = self.get_data(SIZE_ZERO)
350 d = upload_filename(self.u, fn)
351 d.addCallback(extract_uri)
352 d.addCallback(self._check_small, SIZE_ZERO)
355 def test_filename_small(self):
356 fn = "Uploader-test_filename_small.data"
358 data = self.get_data(SIZE_SMALL)
361 d = upload_filename(self.u, fn)
362 d.addCallback(extract_uri)
363 d.addCallback(self._check_small, SIZE_SMALL)
366 def test_filename_large(self):
367 fn = "Uploader-test_filename_large.data"
369 data = self.get_data(SIZE_LARGE)
372 d = upload_filename(self.u, fn)
373 d.addCallback(extract_uri)
374 d.addCallback(self._check_large, SIZE_LARGE)
377 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
378 def make_node(self, mode, num_servers=10):
379 self.node = FakeClient(mode, num_servers)
380 self.u = upload.Uploader()
381 self.u.running = True
382 self.u.parent = self.node
384 def _check_large(self, newuri, size):
385 u = uri.from_string(newuri)
386 self.failUnless(isinstance(u, uri.CHKFileURI))
387 self.failUnless(isinstance(u.get_storage_index(), str))
388 self.failUnlessEqual(len(u.get_storage_index()), 16)
389 self.failUnless(isinstance(u.key, str))
390 self.failUnlessEqual(len(u.key), 16)
391 self.failUnlessEqual(u.size, size)
393 def test_first_error(self):
394 mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
396 d = upload_data(self.u, DATA)
397 d.addCallback(extract_uri)
398 d.addCallback(self._check_large, SIZE_LARGE)
401 def test_first_error_all(self):
402 self.make_node("first-fail")
403 d = self.shouldFail(NoSharesError, "first_error_all",
404 "peer selection failed",
405 upload_data, self.u, DATA)
407 self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
408 # there should also be a 'last failure was' message
409 self.failUnlessIn("ServerError", str(f.value))
410 d.addCallback(_check)
413 def test_second_error(self):
414 # we want to make sure we make it to a third pass. This means that
415 # the first pass was insufficient to place all shares, and at least
416 # one of second pass servers (other than the last one) accepted a
417 # share (so we'll believe that a third pass will be useful). (if
418 # everyone but the last server throws an error, then we'll send all
419 # the remaining shares to the last server at the end of the second
420 # pass, and if that succeeds, we won't make it to a third pass).
422 # we can achieve this 97.5% of the time by using 40 servers, having
423 # 39 of them fail on the second request, leaving only one to succeed
424 # on the second request. (we need to keep the number of servers low
425 # enough to ensure a second pass with 100 shares).
426 mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
427 self.make_node(mode, 40)
428 d = upload_data(self.u, DATA)
429 d.addCallback(extract_uri)
430 d.addCallback(self._check_large, SIZE_LARGE)
433 def test_second_error_all(self):
434 self.make_node("second-fail")
435 d = self.shouldFail(NotEnoughSharesError, "second_error_all",
436 "peer selection failed",
437 upload_data, self.u, DATA)
439 self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
440 # there should also be a 'last failure was' message
441 self.failUnlessIn("ServerError", str(f.value))
442 d.addCallback(_check)
445 class FullServer(unittest.TestCase):
447 self.node = FakeClient(mode="full")
448 self.u = upload.Uploader()
449 self.u.running = True
450 self.u.parent = self.node
452 def _should_fail(self, f):
453 self.failUnless(isinstance(f, Failure) and f.check(NoSharesError), f)
455 def test_data_large(self):
457 d = upload_data(self.u, data)
458 d.addBoth(self._should_fail)
461 class PeerSelection(unittest.TestCase):
463 def make_client(self, num_servers=50):
464 self.node = FakeClient(mode="good", num_servers=num_servers)
465 self.u = upload.Uploader()
466 self.u.running = True
467 self.u.parent = self.node
469 def get_data(self, size):
472 def _check_large(self, newuri, size):
473 u = uri.from_string(newuri)
474 self.failUnless(isinstance(u, uri.CHKFileURI))
475 self.failUnless(isinstance(u.get_storage_index(), str))
476 self.failUnlessEqual(len(u.get_storage_index()), 16)
477 self.failUnless(isinstance(u.key, str))
478 self.failUnlessEqual(len(u.key), 16)
479 self.failUnlessEqual(u.size, size)
481 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
485 "max_segment_size": max_segsize,
487 self.node.DEFAULT_ENCODING_PARAMETERS = p
489 def test_one_each(self):
490 # if we have 50 shares, and there are 50 peers, and they all accept a
491 # share, we should get exactly one share per peer
494 data = self.get_data(SIZE_LARGE)
495 self.set_encoding_parameters(25, 30, 50)
496 d = upload_data(self.u, data)
497 d.addCallback(extract_uri)
498 d.addCallback(self._check_large, SIZE_LARGE)
500 for p in self.node.last_peers:
501 allocated = p.allocated
502 self.failUnlessEqual(len(allocated), 1)
503 self.failUnlessEqual(p.queries, 1)
504 d.addCallback(_check)
507 def test_two_each(self):
508 # if we have 100 shares, and there are 50 peers, and they all accept
509 # all shares, we should get exactly two shares per peer
512 data = self.get_data(SIZE_LARGE)
513 self.set_encoding_parameters(50, 75, 100)
514 d = upload_data(self.u, data)
515 d.addCallback(extract_uri)
516 d.addCallback(self._check_large, SIZE_LARGE)
518 for p in self.node.last_peers:
519 allocated = p.allocated
520 self.failUnlessEqual(len(allocated), 2)
521 self.failUnlessEqual(p.queries, 2)
522 d.addCallback(_check)
525 def test_one_each_plus_one_extra(self):
526 # if we have 51 shares, and there are 50 peers, then one peer gets
527 # two shares and the rest get just one
530 data = self.get_data(SIZE_LARGE)
531 self.set_encoding_parameters(24, 41, 51)
532 d = upload_data(self.u, data)
533 d.addCallback(extract_uri)
534 d.addCallback(self._check_large, SIZE_LARGE)
538 for p in self.node.last_peers:
539 allocated = p.allocated
540 self.failUnless(len(allocated) in (1,2), len(allocated))
541 if len(allocated) == 1:
542 self.failUnlessEqual(p.queries, 1)
545 self.failUnlessEqual(p.queries, 2)
547 self.failUnlessEqual(len(got_one), 49)
548 self.failUnlessEqual(len(got_two), 1)
549 d.addCallback(_check)
552 def test_four_each(self):
553 # if we have 200 shares, and there are 50 peers, then each peer gets
554 # 4 shares. The design goal is to accomplish this with only two
558 data = self.get_data(SIZE_LARGE)
559 self.set_encoding_parameters(100, 150, 200)
560 d = upload_data(self.u, data)
561 d.addCallback(extract_uri)
562 d.addCallback(self._check_large, SIZE_LARGE)
564 for p in self.node.last_peers:
565 allocated = p.allocated
566 self.failUnlessEqual(len(allocated), 4)
567 self.failUnlessEqual(p.queries, 2)
568 d.addCallback(_check)
571 def test_three_of_ten(self):
572 # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
576 data = self.get_data(SIZE_LARGE)
577 self.set_encoding_parameters(3, 5, 10)
578 d = upload_data(self.u, data)
579 d.addCallback(extract_uri)
580 d.addCallback(self._check_large, SIZE_LARGE)
583 for p in self.node.last_peers:
584 allocated = p.allocated
585 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
586 histogram = [counts.get(i, 0) for i in range(5)]
587 self.failUnlessEqual(histogram, [0,0,0,2,1])
588 d.addCallback(_check)
591 def test_some_big_some_small(self):
592 # 10 shares, 20 servers, but half the servers don't support a
593 # share-size large enough for our file
594 mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
595 self.node = FakeClient(mode, num_servers=20)
596 self.u = upload.Uploader()
597 self.u.running = True
598 self.u.parent = self.node
600 data = self.get_data(SIZE_LARGE)
601 self.set_encoding_parameters(3, 5, 10)
602 d = upload_data(self.u, data)
603 d.addCallback(extract_uri)
604 d.addCallback(self._check_large, SIZE_LARGE)
606 # we should have put one share each on the big peers, and zero
607 # shares on the small peers
609 for p in self.node.last_peers:
611 self.failUnlessEqual(len(p.allocated), 1)
612 elif p.mode == "small":
613 self.failUnlessEqual(len(p.allocated), 0)
614 total_allocated += len(p.allocated)
615 self.failUnlessEqual(total_allocated, 10)
616 d.addCallback(_check)
620 class StorageIndex(unittest.TestCase):
621 def test_params_must_matter(self):
622 DATA = "I am some data"
623 u = upload.Data(DATA, convergence="")
624 eu = upload.EncryptAnUploadable(u)
625 d1 = eu.get_storage_index()
627 # CHK means the same data should encrypt the same way
628 u = upload.Data(DATA, convergence="")
629 eu = upload.EncryptAnUploadable(u)
630 d1a = eu.get_storage_index()
632 # but if we use a different convergence string it should be different
633 u = upload.Data(DATA, convergence="wheee!")
634 eu = upload.EncryptAnUploadable(u)
635 d1salt1 = eu.get_storage_index()
637 # and if we add yet a different convergence it should be different again
638 u = upload.Data(DATA, convergence="NOT wheee!")
639 eu = upload.EncryptAnUploadable(u)
640 d1salt2 = eu.get_storage_index()
642 # and if we use the first string again it should be the same as last time
643 u = upload.Data(DATA, convergence="wheee!")
644 eu = upload.EncryptAnUploadable(u)
645 d1salt1a = eu.get_storage_index()
647 # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
648 u = upload.Data(DATA, convergence="")
649 u.encoding_param_k = u.default_encoding_param_k + 1
650 eu = upload.EncryptAnUploadable(u)
651 d2 = eu.get_storage_index()
653 # and if we use a random key, it should be different than the CHK
654 u = upload.Data(DATA, convergence=None)
655 eu = upload.EncryptAnUploadable(u)
656 d3 = eu.get_storage_index()
657 # and different from another instance
658 u = upload.Data(DATA, convergence=None)
659 eu = upload.EncryptAnUploadable(u)
660 d4 = eu.get_storage_index()
662 d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
664 si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
665 self.failUnlessEqual(si1, si1a)
666 self.failIfEqual(si1, si2)
667 self.failIfEqual(si1, si3)
668 self.failIfEqual(si1, si4)
669 self.failIfEqual(si3, si4)
670 self.failIfEqual(si1salt1, si1)
671 self.failIfEqual(si1salt1, si1salt2)
672 self.failIfEqual(si1salt2, si1)
673 self.failUnlessEqual(si1salt1, si1salt1a)
677 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
679 def _do_upload_with_broken_servers(self, servers_to_break):
681 I act like a normal upload, but before I send the results of
682 Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
683 PeerTrackers in the used_peers part of the return result.
685 assert self.g, "I tried to find a grid at self.g, but failed"
686 broker = self.g.clients[0].storage_broker
687 sh = self.g.clients[0]._secret_holder
688 data = upload.Data("data" * 10000, convergence="")
689 data.encoding_param_k = 3
690 data.encoding_param_happy = 4
691 data.encoding_param_n = 10
692 uploadable = upload.EncryptAnUploadable(data)
693 encoder = encode.Encoder()
694 encoder.set_encrypted_uploadable(uploadable)
695 status = upload.UploadStatus()
696 selector = upload.Tahoe2PeerSelector("dglev", "test", status)
697 storage_index = encoder.get_param("storage_index")
698 share_size = encoder.get_param("share_size")
699 block_size = encoder.get_param("block_size")
700 num_segments = encoder.get_param("num_segments")
701 d = selector.get_shareholders(broker, sh, storage_index,
702 share_size, block_size, num_segments,
704 def _have_shareholders((used_peers, already_peers)):
705 assert servers_to_break <= len(used_peers)
706 for index in xrange(servers_to_break):
707 server = list(used_peers)[index]
708 for share in server.buckets.keys():
709 server.buckets[share].abort()
711 servermap = already_peers.copy()
712 for peer in used_peers:
713 buckets.update(peer.buckets)
714 for bucket in peer.buckets:
715 servermap[bucket] = peer.peerid
716 encoder.set_shareholders(buckets, servermap)
719 d.addCallback(_have_shareholders)
722 def _add_server(self, server_number, readonly=False):
723 assert self.g, "I tried to find a grid at self.g, but failed"
724 assert self.shares, "I tried to find shares at self.shares, but failed"
725 ss = self.g.make_server(server_number, readonly)
726 self.g.add_server(server_number, ss)
728 def _add_server_with_share(self, server_number, share_number=None,
730 self._add_server(server_number, readonly)
732 self._copy_share_to_server(share_number, server_number)
734 def _copy_share_to_server(self, share_number, server_number):
735 ss = self.g.servers_by_number[server_number]
736 # Copy share i from the directory associated with the first
737 # storage server to the directory associated with this one.
738 assert self.g, "I tried to find a grid at self.g, but failed"
739 assert self.shares, "I tried to find shares at self.shares, but failed"
740 old_share_location = self.shares[share_number][2]
741 new_share_location = os.path.join(ss.storedir, "shares")
742 si = uri.from_string(self.uri).get_storage_index()
743 new_share_location = os.path.join(new_share_location,
744 storage_index_to_dir(si))
745 if not os.path.exists(new_share_location):
746 os.makedirs(new_share_location)
747 new_share_location = os.path.join(new_share_location,
749 shutil.copy(old_share_location, new_share_location)
750 shares = self.find_shares(self.uri)
751 # Make sure that the storage server has the share.
752 self.failUnless((share_number, ss.my_nodeid, new_share_location)
756 def _setup_and_upload(self):
758 I set up a NoNetworkGrid with a single server and client,
759 upload a file to it, store its uri in self.uri, and store its
760 sharedata in self.shares.
762 self.set_up_grid(num_clients=1, num_servers=1)
763 client = self.g.clients[0]
764 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
765 data = upload.Data("data" * 10000, convergence="")
767 d = client.upload(data)
770 d.addCallback(_store_uri)
771 d.addCallback(lambda ign:
772 self.find_shares(self.uri))
773 def _store_shares(shares):
775 d.addCallback(_store_shares)
778 def test_configure_parameters(self):
779 self.basedir = self.mktemp()
780 hooks = {0: self._set_up_nodes_extra_config}
781 self.set_up_grid(client_config_hooks=hooks)
782 c0 = self.g.clients[0]
785 u = upload.Data(DATA, convergence="")
787 d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
788 m = monitor.Monitor()
789 d.addCallback(lambda fn: fn.check(m))
792 self.failUnlessEqual(data["count-shares-needed"], 7)
793 self.failUnlessEqual(data["count-shares-expected"], 12)
794 d.addCallback(_check)
797 def _setUp(self, ns):
798 # Used by test_happy_semantics and test_prexisting_share_behavior
799 # to set up the grid.
800 self.node = FakeClient(mode="good", num_servers=ns)
801 self.u = upload.Uploader()
802 self.u.running = True
803 self.u.parent = self.node
805 def test_happy_semantics(self):
807 DATA = upload.Data("kittens" * 10000, convergence="")
808 # These parameters are unsatisfiable with the client that we've made
809 # -- we'll use them to test that the semnatics work correctly.
810 self.set_encoding_parameters(k=3, happy=5, n=10)
811 d = self.shouldFail(NotEnoughSharesError, "test_happy_semantics",
812 "shares could only be placed on 2 servers "
813 "(5 were requested)",
815 # Let's reset the client to have 10 servers
816 d.addCallback(lambda ign:
818 # These parameters are satisfiable with the client we've made.
819 d.addCallback(lambda ign:
820 self.set_encoding_parameters(k=3, happy=5, n=10))
822 d.addCallback(lambda ign:
824 # Let's reset the client to have 7 servers
825 # (this is less than n, but more than h)
826 d.addCallback(lambda ign:
828 # These encoding parameters should still be satisfiable with our
830 d.addCallback(lambda ign:
831 self.set_encoding_parameters(k=3, happy=5, n=10))
832 # This, then, should work.
833 d.addCallback(lambda ign:
837 def test_problem_layouts(self):
838 self.basedir = self.mktemp()
839 # This scenario is at
840 # http://allmydata.org/trac/tahoe/ticket/778#comment:52
842 # The scenario in comment:52 proposes that we have a layout
847 # server 4: shares 2 - 10
848 # To get access to the shares, we will first upload to one
849 # server, which will then have shares 1 - 10. We'll then
850 # add three new servers, configure them to not accept any new
851 # shares, then write share 1 directly into the serverdir of each.
852 # Then each of servers 1 - 3 will report that they have share 1,
853 # and will not accept any new share, while server 4 will report that
854 # it has shares 2 - 10 and will accept new shares.
855 # We'll then set 'happy' = 4, and see that an upload fails
857 d = self._setup_and_upload()
858 d.addCallback(lambda ign:
859 self._add_server_with_share(1, 0, True))
860 d.addCallback(lambda ign:
861 self._add_server_with_share(2, 0, True))
862 d.addCallback(lambda ign:
863 self._add_server_with_share(3, 0, True))
864 # Remove the first share from server 0.
865 def _remove_share_0():
866 share_location = self.shares[0][2]
867 os.remove(share_location)
868 d.addCallback(lambda ign:
870 # Set happy = 4 in the client.
872 client = self.g.clients[0]
873 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
875 d.addCallback(lambda ign:
877 # Uploading data should fail
878 d.addCallback(lambda client:
879 self.shouldFail(NotEnoughSharesError, "test_happy_semantics",
880 "shares could only be placed on 1 servers "
881 "(4 were requested)",
882 client.upload, upload.Data("data" * 10000,
886 # This scenario is at
887 # http://allmydata.org/trac/tahoe/ticket/778#comment:53
889 # Set up the grid to have one server
890 def _change_basedir(ign):
891 self.basedir = self.mktemp()
892 d.addCallback(_change_basedir)
893 d.addCallback(lambda ign:
894 self._setup_and_upload())
895 # We want to have a layout like this:
899 # server 4: shares 1 - 10
900 # (this is an expansion of Zooko's example because it is easier
901 # to code, but it will fail in the same way)
902 # To start, we'll create a server with shares 1-10 of the data
903 # we're about to upload.
904 # Next, we'll add three new servers to our NoNetworkGrid. We'll add
905 # one share from our initial upload to each of these.
906 # The counterintuitive ordering of the share numbers is to deal with
907 # the permuting of these servers -- distributing the shares this
908 # way ensures that the Tahoe2PeerSelector sees them in the order
910 d.addCallback(lambda ign:
911 self._add_server_with_share(server_number=1, share_number=2))
912 d.addCallback(lambda ign:
913 self._add_server_with_share(server_number=2, share_number=0))
914 d.addCallback(lambda ign:
915 self._add_server_with_share(server_number=3, share_number=1))
916 # So, we now have the following layout:
917 # server 0: shares 0 - 9
921 # We want to change the 'happy' parameter in the client to 4.
922 # We then want to feed the upload process a list of peers that
923 # server 0 is at the front of, so we trigger Zooko's scenario.
924 # Ideally, a reupload of our original data should work.
925 def _reset_encoding_parameters(ign):
926 client = self.g.clients[0]
927 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
929 d.addCallback(_reset_encoding_parameters)
930 # We need this to get around the fact that the old Data
931 # instance already has a happy parameter set.
932 d.addCallback(lambda client:
933 client.upload(upload.Data("data" * 10000, convergence="")))
936 # This scenario is basically comment:53, but with the order reversed;
937 # this means that the Tahoe2PeerSelector sees
938 # server 0: shares 1-10
942 d.addCallback(_change_basedir)
943 d.addCallback(lambda ign:
944 self._setup_and_upload())
945 d.addCallback(lambda ign:
946 self._add_server_with_share(server_number=2, share_number=0))
947 d.addCallback(lambda ign:
948 self._add_server_with_share(server_number=3, share_number=1))
949 d.addCallback(lambda ign:
950 self._add_server_with_share(server_number=1, share_number=2))
951 # Copy all of the other shares to server number 2
952 def _copy_shares(ign):
953 for i in xrange(1, 10):
954 self._copy_share_to_server(i, 2)
955 d.addCallback(_copy_shares)
956 # Remove the first server, and add a placeholder with share 0
957 d.addCallback(lambda ign:
958 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
959 d.addCallback(lambda ign:
960 self._add_server_with_share(server_number=0, share_number=0))
962 d.addCallback(_reset_encoding_parameters)
963 d.addCallback(lambda client:
964 client.upload(upload.Data("data" * 10000, convergence="")))
965 # Try the same thing, but with empty servers after the first one
966 # We want to make sure that Tahoe2PeerSelector will redistribute
967 # shares as necessary, not simply discover an existing layout.
968 d.addCallback(_change_basedir)
969 d.addCallback(lambda ign:
970 self._setup_and_upload())
971 d.addCallback(lambda ign:
972 self._add_server(server_number=2))
973 d.addCallback(lambda ign:
974 self._add_server(server_number=3))
975 d.addCallback(lambda ign:
976 self._add_server(server_number=1))
977 d.addCallback(_copy_shares)
978 d.addCallback(lambda ign:
979 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
980 d.addCallback(lambda ign:
981 self._add_server(server_number=0))
982 d.addCallback(_reset_encoding_parameters)
983 d.addCallback(lambda client:
984 client.upload(upload.Data("data" * 10000, convergence="")))
985 # Try the following layout
986 # server 0: shares 1-10
987 # server 1: share 1, read-only
988 # server 2: share 2, read-only
989 # server 3: share 3, read-only
990 d.addCallback(_change_basedir)
991 d.addCallback(lambda ign:
992 self._setup_and_upload())
993 d.addCallback(lambda ign:
994 self._add_server_with_share(server_number=2, share_number=0))
995 d.addCallback(lambda ign:
996 self._add_server_with_share(server_number=3, share_number=1,
998 d.addCallback(lambda ign:
999 self._add_server_with_share(server_number=1, share_number=2,
1001 # Copy all of the other shares to server number 2
1002 d.addCallback(_copy_shares)
1003 # Remove server 0, and add another in its place
1004 d.addCallback(lambda ign:
1005 self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1006 d.addCallback(lambda ign:
1007 self._add_server_with_share(server_number=0, share_number=0,
1009 d.addCallback(_reset_encoding_parameters)
1010 d.addCallback(lambda client:
1011 client.upload(upload.Data("data" * 10000, convergence="")))
1015 def test_dropped_servers_in_encoder(self):
1016 def _set_basedir(ign=None):
1017 self.basedir = self.mktemp()
1019 d = self._setup_and_upload();
1020 # Add 5 servers, with one share each from the original
1021 def _do_server_setup(ign):
1022 self._add_server_with_share(1, 1)
1023 self._add_server_with_share(2)
1024 self._add_server_with_share(3)
1025 self._add_server_with_share(4)
1026 self._add_server_with_share(5)
1027 d.addCallback(_do_server_setup)
1028 # remove the original server
1029 # (necessary to ensure that the Tahoe2PeerSelector will distribute
1031 def _remove_server(ign):
1032 server = self.g.servers_by_number[0]
1033 self.g.remove_server(server.my_nodeid)
1034 d.addCallback(_remove_server)
1035 # This should succeed.
1036 d.addCallback(lambda ign:
1037 self._do_upload_with_broken_servers(1))
1038 # Now, do the same thing over again, but drop 2 servers instead
1039 # of 1. This should fail.
1040 d.addCallback(_set_basedir)
1041 d.addCallback(lambda ign:
1042 self._setup_and_upload())
1043 d.addCallback(_do_server_setup)
1044 d.addCallback(_remove_server)
1045 d.addCallback(lambda ign:
1046 self.shouldFail(NotEnoughSharesError,
1047 "test_dropped_server_in_encoder", "",
1048 self._do_upload_with_broken_servers, 2))
1052 def test_servers_with_unique_shares(self):
1053 # servers_with_unique_shares expects a dict of
1054 # shnum => peerid as a preexisting shares argument.
1061 unique_servers = upload.servers_with_unique_shares(test1)
1062 self.failUnlessEqual(4, len(unique_servers))
1063 for server in ["server1", "server2", "server3", "server4"]:
1064 self.failUnlessIn(server, unique_servers)
1065 test1[4] = "server1"
1066 # Now there should only be 3 unique servers.
1067 unique_servers = upload.servers_with_unique_shares(test1)
1068 self.failUnlessEqual(3, len(unique_servers))
1069 for server in ["server1", "server2", "server3"]:
1070 self.failUnlessIn(server, unique_servers)
1071 # servers_with_unique_shares expects a set of PeerTracker
1072 # instances as a used_peers argument, but only uses the peerid
1073 # instance variable to assess uniqueness. So we feed it some fake
1074 # PeerTrackers whose only important characteristic is that they
1075 # have peerid set to something.
1076 class FakePeerTracker:
1079 for server in ["server5", "server6", "server7", "server8"]:
1080 t = FakePeerTracker()
1083 # Recall that there are 3 unique servers in test1. Since none of
1084 # those overlap with the ones in trackers, we should get 7 back
1085 unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
1086 self.failUnlessEqual(7, len(unique_servers))
1087 expected_servers = ["server" + str(i) for i in xrange(1, 9)]
1088 expected_servers.remove("server4")
1089 for server in expected_servers:
1090 self.failUnlessIn(server, unique_servers)
1091 # Now add an overlapping server to trackers.
1092 t = FakePeerTracker()
1093 t.peerid = "server1"
1095 unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
1096 self.failUnlessEqual(7, len(unique_servers))
1097 for server in expected_servers:
1098 self.failUnlessIn(server, unique_servers)
1101 def test_shares_by_server(self):
1108 shares_by_server = upload.shares_by_server(test)
1109 self.failUnlessEqual(set([1]), shares_by_server["server1"])
1110 self.failUnlessEqual(set([2]), shares_by_server["server2"])
1111 self.failUnlessEqual(set([3]), shares_by_server["server3"])
1112 self.failUnlessEqual(set([4]), shares_by_server["server4"])
1120 shares_by_server = upload.shares_by_server(test1)
1121 self.failUnlessEqual(set([1, 2, 3]), shares_by_server["server1"])
1122 self.failUnlessEqual(set([4, 5]), shares_by_server["server2"])
1125 def _set_up_nodes_extra_config(self, clientdir):
1126 cfgfn = os.path.join(clientdir, "tahoe.cfg")
1127 oldcfg = open(cfgfn, "r").read()
1128 f = open(cfgfn, "wt")
1131 f.write("[client]\n")
1132 f.write("shares.needed = 7\n")
1133 f.write("shares.total = 12\n")
1139 # upload with exactly 75 peers (shares_of_happiness)
1140 # have a download fail
1141 # cancel a download (need to implement more cancel stuff)