3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.python.failure import Failure
6 from twisted.python import log
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, NoSharesError, \
15 from allmydata.util.assertutil import precondition
16 from allmydata.util.deferredutil import DeferredListShouldSucceed
17 from no_network import GridTestMixin
18 from common_util import ShouldFailMixin
19 from allmydata.storage_client import StorageFarmBroker
20 from allmydata.storage.server import storage_index_to_dir
24 def extract_uri(results):
27 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
28 # have been due to an earlier test ERROR'ing out due to timeout, which seems
29 # to screw up subsequent tests.
32 class Uploadable(unittest.TestCase):
33 def shouldEqual(self, data, expected):
34 self.failUnless(isinstance(data, list))
36 self.failUnless(isinstance(e, str))
38 self.failUnlessEqual(s, expected)
40 def test_filehandle_random_key(self):
41 return self._test_filehandle(convergence=None)
43 def test_filehandle_convergent_encryption(self):
44 return self._test_filehandle(convergence="some convergence string")
46 def _test_filehandle(self, convergence):
48 u = upload.FileHandle(s, convergence=convergence)
50 d.addCallback(self.failUnlessEqual, 41)
51 d.addCallback(lambda res: u.read(1))
52 d.addCallback(self.shouldEqual, "a")
53 d.addCallback(lambda res: u.read(80))
54 d.addCallback(self.shouldEqual, "a"*40)
55 d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
56 d.addCallback(lambda res: s.close()) # that privilege is reserved for us
59 def test_filename(self):
60 basedir = "upload/Uploadable/test_filename"
62 fn = os.path.join(basedir, "file")
66 u = upload.FileName(fn, convergence=None)
68 d.addCallback(self.failUnlessEqual, 41)
69 d.addCallback(lambda res: u.read(1))
70 d.addCallback(self.shouldEqual, "a")
71 d.addCallback(lambda res: u.read(80))
72 d.addCallback(self.shouldEqual, "a"*40)
73 d.addCallback(lambda res: u.close())
78 u = upload.Data(s, convergence=None)
80 d.addCallback(self.failUnlessEqual, 41)
81 d.addCallback(lambda res: u.read(1))
82 d.addCallback(self.shouldEqual, "a")
83 d.addCallback(lambda res: u.read(80))
84 d.addCallback(self.shouldEqual, "a"*40)
85 d.addCallback(lambda res: u.close())
88 class ServerError(Exception):
92 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
96 "max_segment_size": max_segsize,
98 self.node.DEFAULT_ENCODING_PARAMETERS = p
100 class FakeStorageServer:
101 def __init__(self, mode):
105 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
106 { "maximum-immutable-share-size": 2**32 },
107 "application-version": str(allmydata.__full_version__),
110 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
111 { "maximum-immutable-share-size": 10 },
112 "application-version": str(allmydata.__full_version__),
116 def callRemote(self, methname, *args, **kwargs):
118 meth = getattr(self, methname)
119 return meth(*args, **kwargs)
121 d.addCallback(lambda res: _call())
124 def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
125 sharenums, share_size, canary):
126 #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
127 if self.mode == "first-fail":
128 if self.queries == 0:
130 if self.mode == "second-fail":
131 if self.queries == 1:
134 if self.mode == "full":
136 elif self.mode == "already got them":
137 return (set(sharenums), {},)
139 for shnum in sharenums:
140 self.allocated.append( (storage_index, shnum) )
142 dict([( shnum, FakeBucketWriter(share_size) )
143 for shnum in sharenums]),
146 class FakeBucketWriter:
147 # a diagnostic version of storageserver.BucketWriter
148 def __init__(self, size):
149 self.data = StringIO()
153 def callRemote(self, methname, *args, **kwargs):
155 meth = getattr(self, "remote_" + methname)
156 return meth(*args, **kwargs)
158 d.addCallback(lambda res: _call())
161 def remote_write(self, offset, data):
162 precondition(not self.closed)
163 precondition(offset >= 0)
164 precondition(offset+len(data) <= self._size,
165 "offset=%d + data=%d > size=%d" %
166 (offset, len(data), self._size))
167 self.data.seek(offset)
168 self.data.write(data)
170 def remote_close(self):
171 precondition(not self.closed)
174 def remote_abort(self):
175 log.err(RuntimeError("uh oh, I was asked to abort"))
178 DEFAULT_ENCODING_PARAMETERS = {"k":25,
181 "max_segment_size": 1*MiB,
183 def __init__(self, mode="good", num_servers=50):
184 self.num_servers = num_servers
185 if type(mode) is str:
186 mode = dict([i,mode] for i in range(num_servers))
187 peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
188 for fakeid in range(self.num_servers) ]
189 self.storage_broker = StorageFarmBroker(None, permute_peers=True)
190 for (serverid, server) in peers:
191 self.storage_broker.test_add_server(serverid, server)
192 self.last_peers = [p[1] for p in peers]
194 def log(self, *args, **kwargs):
196 def get_encoding_parameters(self):
197 return self.DEFAULT_ENCODING_PARAMETERS
198 def get_storage_broker(self):
199 return self.storage_broker
200 _secret_holder = client.SecretHolder("lease secret", "convergence secret")
202 class GotTooFarError(Exception):
205 class GiganticUploadable(upload.FileHandle):
206 def __init__(self, size):
210 def get_encryption_key(self):
211 return defer.succeed("\x00" * 16)
213 return defer.succeed(self._size)
214 def read(self, length):
215 left = self._size - self._fp
216 length = min(left, length)
218 if self._fp > 1000000:
219 # terminate the test early.
220 raise GotTooFarError("we shouldn't be allowed to get this far")
221 return defer.succeed(["\x00" * length])
226 Once upon a time, there was a beautiful princess named Buttercup. She lived
227 in a magical land where every file was stored securely among millions of
228 machines, and nobody ever worried about their data being lost ever again.
231 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
235 SIZE_LARGE = len(DATA)
237 def upload_data(uploader, data):
238 u = upload.Data(data, convergence=None)
239 return uploader.upload(u)
240 def upload_filename(uploader, filename):
241 u = upload.FileName(filename, convergence=None)
242 return uploader.upload(u)
243 def upload_filehandle(uploader, fh):
244 u = upload.FileHandle(fh, convergence=None)
245 return uploader.upload(u)
247 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
249 self.node = FakeClient(mode="good")
250 self.u = upload.Uploader()
251 self.u.running = True
252 self.u.parent = self.node
254 def _check_small(self, newuri, size):
255 u = uri.from_string(newuri)
256 self.failUnless(isinstance(u, uri.LiteralFileURI))
257 self.failUnlessEqual(len(u.data), size)
259 def _check_large(self, newuri, size):
260 u = uri.from_string(newuri)
261 self.failUnless(isinstance(u, uri.CHKFileURI))
262 self.failUnless(isinstance(u.get_storage_index(), str))
263 self.failUnlessEqual(len(u.get_storage_index()), 16)
264 self.failUnless(isinstance(u.key, str))
265 self.failUnlessEqual(len(u.key), 16)
266 self.failUnlessEqual(u.size, size)
268 def get_data(self, size):
271 def test_too_large(self):
272 # we've removed the 4GiB share size limit (see ticket #346 for
273 # details), but still have an 8-byte field, so the limit is now
274 # 2**64, so make sure we reject files larger than that.
275 k = 3; happy = 7; n = 10
276 self.set_encoding_parameters(k, happy, n)
278 data1 = GiganticUploadable(big)
279 d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
280 "This file is too large to be uploaded (data_size)",
281 self.u.upload, data1)
282 data2 = GiganticUploadable(big-3)
283 d.addCallback(lambda res:
284 self.shouldFail(FileTooLargeError,
285 "test_too_large-data2",
286 "This file is too large to be uploaded (offsets)",
287 self.u.upload, data2))
288 # I don't know where the actual limit is.. it depends upon how large
289 # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
292 def test_data_zero(self):
293 data = self.get_data(SIZE_ZERO)
294 d = upload_data(self.u, data)
295 d.addCallback(extract_uri)
296 d.addCallback(self._check_small, SIZE_ZERO)
299 def test_data_small(self):
300 data = self.get_data(SIZE_SMALL)
301 d = upload_data(self.u, data)
302 d.addCallback(extract_uri)
303 d.addCallback(self._check_small, SIZE_SMALL)
306 def test_data_large(self):
307 data = self.get_data(SIZE_LARGE)
308 d = upload_data(self.u, data)
309 d.addCallback(extract_uri)
310 d.addCallback(self._check_large, SIZE_LARGE)
313 def test_data_large_odd_segments(self):
314 data = self.get_data(SIZE_LARGE)
315 segsize = int(SIZE_LARGE / 2.5)
316 # we want 3 segments, since that's not a power of two
317 self.set_encoding_parameters(25, 75, 100, segsize)
318 d = upload_data(self.u, data)
319 d.addCallback(extract_uri)
320 d.addCallback(self._check_large, SIZE_LARGE)
323 def test_filehandle_zero(self):
324 data = self.get_data(SIZE_ZERO)
325 d = upload_filehandle(self.u, StringIO(data))
326 d.addCallback(extract_uri)
327 d.addCallback(self._check_small, SIZE_ZERO)
330 def test_filehandle_small(self):
331 data = self.get_data(SIZE_SMALL)
332 d = upload_filehandle(self.u, StringIO(data))
333 d.addCallback(extract_uri)
334 d.addCallback(self._check_small, SIZE_SMALL)
337 def test_filehandle_large(self):
338 data = self.get_data(SIZE_LARGE)
339 d = upload_filehandle(self.u, StringIO(data))
340 d.addCallback(extract_uri)
341 d.addCallback(self._check_large, SIZE_LARGE)
344 def test_filename_zero(self):
345 fn = "Uploader-test_filename_zero.data"
347 data = self.get_data(SIZE_ZERO)
350 d = upload_filename(self.u, fn)
351 d.addCallback(extract_uri)
352 d.addCallback(self._check_small, SIZE_ZERO)
355 def test_filename_small(self):
356 fn = "Uploader-test_filename_small.data"
358 data = self.get_data(SIZE_SMALL)
361 d = upload_filename(self.u, fn)
362 d.addCallback(extract_uri)
363 d.addCallback(self._check_small, SIZE_SMALL)
366 def test_filename_large(self):
367 fn = "Uploader-test_filename_large.data"
369 data = self.get_data(SIZE_LARGE)
372 d = upload_filename(self.u, fn)
373 d.addCallback(extract_uri)
374 d.addCallback(self._check_large, SIZE_LARGE)
377 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
378 def make_node(self, mode, num_servers=10):
379 self.node = FakeClient(mode, num_servers)
380 self.u = upload.Uploader()
381 self.u.running = True
382 self.u.parent = self.node
384 def _check_large(self, newuri, size):
385 u = uri.from_string(newuri)
386 self.failUnless(isinstance(u, uri.CHKFileURI))
387 self.failUnless(isinstance(u.get_storage_index(), str))
388 self.failUnlessEqual(len(u.get_storage_index()), 16)
389 self.failUnless(isinstance(u.key, str))
390 self.failUnlessEqual(len(u.key), 16)
391 self.failUnlessEqual(u.size, size)
393 def test_first_error(self):
394 mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
396 d = upload_data(self.u, DATA)
397 d.addCallback(extract_uri)
398 d.addCallback(self._check_large, SIZE_LARGE)
401 def test_first_error_all(self):
402 self.make_node("first-fail")
403 d = self.shouldFail(NoSharesError, "first_error_all",
404 "peer selection failed",
405 upload_data, self.u, DATA)
407 self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
408 # there should also be a 'last failure was' message
409 self.failUnlessIn("ServerError", str(f.value))
410 d.addCallback(_check)
413 def test_second_error(self):
414 # we want to make sure we make it to a third pass. This means that
415 # the first pass was insufficient to place all shares, and at least
416 # one of second pass servers (other than the last one) accepted a
417 # share (so we'll believe that a third pass will be useful). (if
418 # everyone but the last server throws an error, then we'll send all
419 # the remaining shares to the last server at the end of the second
420 # pass, and if that succeeds, we won't make it to a third pass).
422 # we can achieve this 97.5% of the time by using 40 servers, having
423 # 39 of them fail on the second request, leaving only one to succeed
424 # on the second request. (we need to keep the number of servers low
425 # enough to ensure a second pass with 100 shares).
426 mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
427 self.make_node(mode, 40)
428 d = upload_data(self.u, DATA)
429 d.addCallback(extract_uri)
430 d.addCallback(self._check_large, SIZE_LARGE)
433 def test_second_error_all(self):
434 self.make_node("second-fail")
435 d = self.shouldFail(NotEnoughSharesError, "second_error_all",
436 "peer selection failed",
437 upload_data, self.u, DATA)
439 self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
440 # there should also be a 'last failure was' message
441 self.failUnlessIn("ServerError", str(f.value))
442 d.addCallback(_check)
445 class FullServer(unittest.TestCase):
447 self.node = FakeClient(mode="full")
448 self.u = upload.Uploader()
449 self.u.running = True
450 self.u.parent = self.node
452 def _should_fail(self, f):
453 self.failUnless(isinstance(f, Failure) and f.check(NoSharesError), f)
455 def test_data_large(self):
457 d = upload_data(self.u, data)
458 d.addBoth(self._should_fail)
461 class PeerSelection(unittest.TestCase):
463 def make_client(self, num_servers=50):
464 self.node = FakeClient(mode="good", num_servers=num_servers)
465 self.u = upload.Uploader()
466 self.u.running = True
467 self.u.parent = self.node
469 def get_data(self, size):
472 def _check_large(self, newuri, size):
473 u = uri.from_string(newuri)
474 self.failUnless(isinstance(u, uri.CHKFileURI))
475 self.failUnless(isinstance(u.get_storage_index(), str))
476 self.failUnlessEqual(len(u.get_storage_index()), 16)
477 self.failUnless(isinstance(u.key, str))
478 self.failUnlessEqual(len(u.key), 16)
479 self.failUnlessEqual(u.size, size)
481 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
485 "max_segment_size": max_segsize,
487 self.node.DEFAULT_ENCODING_PARAMETERS = p
489 def test_one_each(self):
490 # if we have 50 shares, and there are 50 peers, and they all accept a
491 # share, we should get exactly one share per peer
494 data = self.get_data(SIZE_LARGE)
495 self.set_encoding_parameters(25, 30, 50)
496 d = upload_data(self.u, data)
497 d.addCallback(extract_uri)
498 d.addCallback(self._check_large, SIZE_LARGE)
500 for p in self.node.last_peers:
501 allocated = p.allocated
502 self.failUnlessEqual(len(allocated), 1)
503 self.failUnlessEqual(p.queries, 1)
504 d.addCallback(_check)
507 def test_two_each(self):
508 # if we have 100 shares, and there are 50 peers, and they all accept
509 # all shares, we should get exactly two shares per peer
512 data = self.get_data(SIZE_LARGE)
513 self.set_encoding_parameters(50, 75, 100)
514 d = upload_data(self.u, data)
515 d.addCallback(extract_uri)
516 d.addCallback(self._check_large, SIZE_LARGE)
518 for p in self.node.last_peers:
519 allocated = p.allocated
520 self.failUnlessEqual(len(allocated), 2)
521 self.failUnlessEqual(p.queries, 2)
522 d.addCallback(_check)
525 def test_one_each_plus_one_extra(self):
526 # if we have 51 shares, and there are 50 peers, then one peer gets
527 # two shares and the rest get just one
530 data = self.get_data(SIZE_LARGE)
531 self.set_encoding_parameters(24, 41, 51)
532 d = upload_data(self.u, data)
533 d.addCallback(extract_uri)
534 d.addCallback(self._check_large, SIZE_LARGE)
538 for p in self.node.last_peers:
539 allocated = p.allocated
540 self.failUnless(len(allocated) in (1,2), len(allocated))
541 if len(allocated) == 1:
542 self.failUnlessEqual(p.queries, 1)
545 self.failUnlessEqual(p.queries, 2)
547 self.failUnlessEqual(len(got_one), 49)
548 self.failUnlessEqual(len(got_two), 1)
549 d.addCallback(_check)
552 def test_four_each(self):
553 # if we have 200 shares, and there are 50 peers, then each peer gets
554 # 4 shares. The design goal is to accomplish this with only two
558 data = self.get_data(SIZE_LARGE)
559 self.set_encoding_parameters(100, 150, 200)
560 d = upload_data(self.u, data)
561 d.addCallback(extract_uri)
562 d.addCallback(self._check_large, SIZE_LARGE)
564 for p in self.node.last_peers:
565 allocated = p.allocated
566 self.failUnlessEqual(len(allocated), 4)
567 self.failUnlessEqual(p.queries, 2)
568 d.addCallback(_check)
571 def test_three_of_ten(self):
572 # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
576 data = self.get_data(SIZE_LARGE)
577 self.set_encoding_parameters(3, 5, 10)
578 d = upload_data(self.u, data)
579 d.addCallback(extract_uri)
580 d.addCallback(self._check_large, SIZE_LARGE)
583 for p in self.node.last_peers:
584 allocated = p.allocated
585 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
586 histogram = [counts.get(i, 0) for i in range(5)]
587 self.failUnlessEqual(histogram, [0,0,0,2,1])
588 d.addCallback(_check)
591 def test_some_big_some_small(self):
592 # 10 shares, 20 servers, but half the servers don't support a
593 # share-size large enough for our file
594 mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
595 self.node = FakeClient(mode, num_servers=20)
596 self.u = upload.Uploader()
597 self.u.running = True
598 self.u.parent = self.node
600 data = self.get_data(SIZE_LARGE)
601 self.set_encoding_parameters(3, 5, 10)
602 d = upload_data(self.u, data)
603 d.addCallback(extract_uri)
604 d.addCallback(self._check_large, SIZE_LARGE)
606 # we should have put one share each on the big peers, and zero
607 # shares on the small peers
609 for p in self.node.last_peers:
611 self.failUnlessEqual(len(p.allocated), 1)
612 elif p.mode == "small":
613 self.failUnlessEqual(len(p.allocated), 0)
614 total_allocated += len(p.allocated)
615 self.failUnlessEqual(total_allocated, 10)
616 d.addCallback(_check)
620 class StorageIndex(unittest.TestCase):
621 def test_params_must_matter(self):
622 DATA = "I am some data"
623 u = upload.Data(DATA, convergence="")
624 eu = upload.EncryptAnUploadable(u)
625 d1 = eu.get_storage_index()
627 # CHK means the same data should encrypt the same way
628 u = upload.Data(DATA, convergence="")
629 eu = upload.EncryptAnUploadable(u)
630 d1a = eu.get_storage_index()
632 # but if we use a different convergence string it should be different
633 u = upload.Data(DATA, convergence="wheee!")
634 eu = upload.EncryptAnUploadable(u)
635 d1salt1 = eu.get_storage_index()
637 # and if we add yet a different convergence it should be different again
638 u = upload.Data(DATA, convergence="NOT wheee!")
639 eu = upload.EncryptAnUploadable(u)
640 d1salt2 = eu.get_storage_index()
642 # and if we use the first string again it should be the same as last time
643 u = upload.Data(DATA, convergence="wheee!")
644 eu = upload.EncryptAnUploadable(u)
645 d1salt1a = eu.get_storage_index()
647 # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
648 u = upload.Data(DATA, convergence="")
649 u.encoding_param_k = u.default_encoding_param_k + 1
650 eu = upload.EncryptAnUploadable(u)
651 d2 = eu.get_storage_index()
653 # and if we use a random key, it should be different than the CHK
654 u = upload.Data(DATA, convergence=None)
655 eu = upload.EncryptAnUploadable(u)
656 d3 = eu.get_storage_index()
657 # and different from another instance
658 u = upload.Data(DATA, convergence=None)
659 eu = upload.EncryptAnUploadable(u)
660 d4 = eu.get_storage_index()
662 d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
664 si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
665 self.failUnlessEqual(si1, si1a)
666 self.failIfEqual(si1, si2)
667 self.failIfEqual(si1, si3)
668 self.failIfEqual(si1, si4)
669 self.failIfEqual(si3, si4)
670 self.failIfEqual(si1salt1, si1)
671 self.failIfEqual(si1salt1, si1salt2)
672 self.failIfEqual(si1salt2, si1)
673 self.failUnlessEqual(si1salt1, si1salt1a)
677 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
679 def _do_upload_with_broken_servers(self, servers_to_break):
681 I act like a normal upload, but before I send the results of
682 Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
683 PeerTrackers in the used_peers part of the return result.
685 assert self.g, "I tried to find a grid at self.g, but failed"
686 broker = self.g.clients[0].storage_broker
687 sh = self.g.clients[0]._secret_holder
688 data = upload.Data("data" * 10000, convergence="")
689 data.encoding_param_k = 3
690 data.encoding_param_happy = 4
691 data.encoding_param_n = 10
692 uploadable = upload.EncryptAnUploadable(data)
693 encoder = encode.Encoder()
694 encoder.set_encrypted_uploadable(uploadable)
695 status = upload.UploadStatus()
696 selector = upload.Tahoe2PeerSelector("dglev", "test", status)
697 storage_index = encoder.get_param("storage_index")
698 share_size = encoder.get_param("share_size")
699 block_size = encoder.get_param("block_size")
700 num_segments = encoder.get_param("num_segments")
701 d = selector.get_shareholders(broker, sh, storage_index,
702 share_size, block_size, num_segments,
704 def _have_shareholders((used_peers, already_peers)):
705 assert servers_to_break <= len(used_peers)
706 for index in xrange(servers_to_break):
707 server = list(used_peers)[index]
708 for share in server.buckets.keys():
709 server.buckets[share].abort()
711 servermap = already_peers.copy()
712 for peer in used_peers:
713 buckets.update(peer.buckets)
714 for bucket in peer.buckets:
715 servermap[bucket] = peer.peerid
716 encoder.set_shareholders(buckets, servermap)
719 d.addCallback(_have_shareholders)
722 def _add_server_with_share(self, server_number, share_number=None,
724 assert self.g, "I tried to find a grid at self.g, but failed"
725 assert self.shares, "I tried to find shares at self.shares, but failed"
726 ss = self.g.make_server(server_number, readonly)
727 self.g.add_server(server_number, ss)
729 # Copy share i from the directory associated with the first
730 # storage server to the directory associated with this one.
731 old_share_location = self.shares[share_number][2]
732 new_share_location = os.path.join(ss.storedir, "shares")
733 si = uri.from_string(self.uri).get_storage_index()
734 new_share_location = os.path.join(new_share_location,
735 storage_index_to_dir(si))
736 if not os.path.exists(new_share_location):
737 os.makedirs(new_share_location)
738 new_share_location = os.path.join(new_share_location,
740 shutil.copy(old_share_location, new_share_location)
741 shares = self.find_shares(self.uri)
742 # Make sure that the storage server has the share.
743 self.failUnless((share_number, ss.my_nodeid, new_share_location)
746 def _setup_and_upload(self):
748 I set up a NoNetworkGrid with a single server and client,
749 upload a file to it, store its uri in self.uri, and store its
750 sharedata in self.shares.
752 self.set_up_grid(num_clients=1, num_servers=1)
753 client = self.g.clients[0]
754 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
755 data = upload.Data("data" * 10000, convergence="")
757 d = client.upload(data)
760 d.addCallback(_store_uri)
761 d.addCallback(lambda ign:
762 self.find_shares(self.uri))
763 def _store_shares(shares):
765 d.addCallback(_store_shares)
768 def test_configure_parameters(self):
769 self.basedir = self.mktemp()
770 hooks = {0: self._set_up_nodes_extra_config}
771 self.set_up_grid(client_config_hooks=hooks)
772 c0 = self.g.clients[0]
775 u = upload.Data(DATA, convergence="")
777 d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
778 m = monitor.Monitor()
779 d.addCallback(lambda fn: fn.check(m))
782 self.failUnlessEqual(data["count-shares-needed"], 7)
783 self.failUnlessEqual(data["count-shares-expected"], 12)
784 d.addCallback(_check)
787 def _setUp(self, ns):
788 # Used by test_happy_semantics and test_prexisting_share_behavior
789 # to set up the grid.
790 self.node = FakeClient(mode="good", num_servers=ns)
791 self.u = upload.Uploader()
792 self.u.running = True
793 self.u.parent = self.node
795 def test_happy_semantics(self):
797 DATA = upload.Data("kittens" * 10000, convergence="")
798 # These parameters are unsatisfiable with the client that we've made
799 # -- we'll use them to test that the semnatics work correctly.
800 self.set_encoding_parameters(k=3, happy=5, n=10)
801 d = self.shouldFail(NotEnoughSharesError, "test_happy_semantics",
802 "shares could only be placed on 2 servers "
803 "(5 were requested)",
805 # Let's reset the client to have 10 servers
806 d.addCallback(lambda ign:
808 # These parameters are satisfiable with the client we've made.
809 d.addCallback(lambda ign:
810 self.set_encoding_parameters(k=3, happy=5, n=10))
812 d.addCallback(lambda ign:
814 # Let's reset the client to have 7 servers
815 # (this is less than n, but more than h)
816 d.addCallback(lambda ign:
818 # These encoding parameters should still be satisfiable with our
820 d.addCallback(lambda ign:
821 self.set_encoding_parameters(k=3, happy=5, n=10))
822 # This, then, should work.
823 d.addCallback(lambda ign:
827 def test_problem_layouts(self):
828 self.basedir = self.mktemp()
829 # This scenario is at
830 # http://allmydata.org/trac/tahoe/ticket/778#comment:52
832 # The scenario in comment:52 proposes that we have a layout
837 # server 4: shares 2 - 10
838 # To get access to the shares, we will first upload to one
839 # server, which will then have shares 1 - 10. We'll then
840 # add three new servers, configure them to not accept any new
841 # shares, then write share 1 directly into the serverdir of each.
842 # Then each of servers 1 - 3 will report that they have share 1,
843 # and will not accept any new share, while server 4 will report that
844 # it has shares 2 - 10 and will accept new shares.
845 # We'll then set 'happy' = 4, and see that an upload fails
847 d = self._setup_and_upload()
848 d.addCallback(lambda ign:
849 self._add_server_with_share(1, 0, True))
850 d.addCallback(lambda ign:
851 self._add_server_with_share(2, 0, True))
852 d.addCallback(lambda ign:
853 self._add_server_with_share(3, 0, True))
854 # Remove the first share from server 0.
855 def _remove_share_0():
856 share_location = self.shares[0][2]
857 os.remove(share_location)
858 d.addCallback(lambda ign:
860 # Set happy = 4 in the client.
862 client = self.g.clients[0]
863 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
865 d.addCallback(lambda ign:
867 # Uploading data should fail
868 d.addCallback(lambda client:
869 self.shouldFail(NotEnoughSharesError, "test_happy_semantics",
870 "shares could only be placed on 1 servers "
871 "(4 were requested)",
872 client.upload, upload.Data("data" * 10000,
876 # This scenario is at
877 # http://allmydata.org/trac/tahoe/ticket/778#comment:53
879 # Set up the grid to have one server
880 def _change_basedir(ign):
881 self.basedir = self.mktemp()
882 d.addCallback(_change_basedir)
883 d.addCallback(lambda ign:
884 self._setup_and_upload())
885 # We want to have a layout like this:
889 # server 4: shares 1 - 10
890 # (this is an expansion of Zooko's example because it is easier
891 # to code, but it will fail in the same way)
892 # To start, we'll create a server with shares 1-10 of the data
893 # we're about to upload.
894 # Next, we'll add three new servers to our NoNetworkGrid. We'll add
895 # one share from our initial upload to each of these.
896 # The counterintuitive ordering of the share numbers is to deal with
897 # the permuting of these servers -- distributing the shares this
898 # way ensures that the Tahoe2PeerSelector sees them in the order
900 d.addCallback(lambda ign:
901 self._add_server_with_share(server_number=1, share_number=2))
902 d.addCallback(lambda ign:
903 self._add_server_with_share(server_number=2, share_number=0))
904 d.addCallback(lambda ign:
905 self._add_server_with_share(server_number=3, share_number=1))
906 # So, we now have the following layout:
907 # server 0: shares 1 - 10
911 # We want to change the 'happy' parameter in the client to 4.
912 # We then want to feed the upload process a list of peers that
913 # server 0 is at the front of, so we trigger Zooko's scenario.
914 # Ideally, a reupload of our original data should work.
915 def _reset_encoding_parameters(ign):
916 client = self.g.clients[0]
917 client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
919 d.addCallback(_reset_encoding_parameters)
920 # We need this to get around the fact that the old Data
921 # instance already has a happy parameter set.
922 d.addCallback(lambda client:
923 client.upload(upload.Data("data" * 10000, convergence="")))
927 def test_dropped_servers_in_encoder(self):
928 def _set_basedir(ign=None):
929 self.basedir = self.mktemp()
931 d = self._setup_and_upload();
932 # Add 5 servers, with one share each from the original
933 def _do_server_setup(ign):
934 self._add_server_with_share(1, 1)
935 self._add_server_with_share(2)
936 self._add_server_with_share(3)
937 self._add_server_with_share(4)
938 self._add_server_with_share(5)
939 d.addCallback(_do_server_setup)
940 # remove the original server
941 # (necessary to ensure that the Tahoe2PeerSelector will distribute
943 def _remove_server(ign):
944 server = self.g.servers_by_number[0]
945 self.g.remove_server(server.my_nodeid)
946 d.addCallback(_remove_server)
947 # This should succeed.
948 d.addCallback(lambda ign:
949 self._do_upload_with_broken_servers(1))
950 # Now, do the same thing over again, but drop 2 servers instead
951 # of 1. This should fail.
952 d.addCallback(_set_basedir)
953 d.addCallback(lambda ign:
954 self._setup_and_upload())
955 d.addCallback(_do_server_setup)
956 d.addCallback(_remove_server)
957 d.addCallback(lambda ign:
958 self.shouldFail(NotEnoughSharesError,
959 "test_dropped_server_in_encoder", "",
960 self._do_upload_with_broken_servers, 2))
964 def test_servers_with_unique_shares(self):
965 # servers_with_unique_shares expects a dict of
966 # shnum => peerid as a preexisting shares argument.
973 unique_servers = upload.servers_with_unique_shares(test1)
974 self.failUnlessEqual(4, len(unique_servers))
975 for server in ["server1", "server2", "server3", "server4"]:
976 self.failUnlessIn(server, unique_servers)
978 # Now there should only be 3 unique servers.
979 unique_servers = upload.servers_with_unique_shares(test1)
980 self.failUnlessEqual(3, len(unique_servers))
981 for server in ["server1", "server2", "server3"]:
982 self.failUnlessIn(server, unique_servers)
983 # servers_with_unique_shares expects a set of PeerTracker
984 # instances as a used_peers argument, but only uses the peerid
985 # instance variable to assess uniqueness. So we feed it some fake
986 # PeerTrackers whose only important characteristic is that they
987 # have peerid set to something.
988 class FakePeerTracker:
991 for server in ["server5", "server6", "server7", "server8"]:
992 t = FakePeerTracker()
995 # Recall that there are 3 unique servers in test1. Since none of
996 # those overlap with the ones in trackers, we should get 7 back
997 unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
998 self.failUnlessEqual(7, len(unique_servers))
999 expected_servers = ["server" + str(i) for i in xrange(1, 9)]
1000 expected_servers.remove("server4")
1001 for server in expected_servers:
1002 self.failUnlessIn(server, unique_servers)
1003 # Now add an overlapping server to trackers.
1004 t = FakePeerTracker()
1005 t.peerid = "server1"
1007 unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
1008 self.failUnlessEqual(7, len(unique_servers))
1009 for server in expected_servers:
1010 self.failUnlessIn(server, unique_servers)
1013 def _set_up_nodes_extra_config(self, clientdir):
1014 cfgfn = os.path.join(clientdir, "tahoe.cfg")
1015 oldcfg = open(cfgfn, "r").read()
1016 f = open(cfgfn, "wt")
1019 f.write("[client]\n")
1020 f.write("shares.needed = 7\n")
1021 f.write("shares.total = 12\n")
1027 # upload with exactly 75 peers (shares_of_happiness)
1028 # have a download fail
1029 # cancel a download (need to implement more cancel stuff)