3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.python.failure import Failure
6 from twisted.python import log
7 from twisted.internet import defer
8 from foolscap import eventual
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor
12 from allmydata.immutable import upload
13 from allmydata.interfaces import IFileURI, FileTooLargeError, NotEnoughSharesError
14 from allmydata.util.assertutil import precondition
15 from allmydata.util.deferredutil import DeferredListShouldSucceed
16 from common import SystemTestMixin
17 from common_util import ShouldFailMixin
21 def extract_uri(results):
24 class Uploadable(unittest.TestCase):
25 def shouldEqual(self, data, expected):
26 self.failUnless(isinstance(data, list))
28 self.failUnless(isinstance(e, str))
30 self.failUnlessEqual(s, expected)
32 def test_filehandle_random_key(self):
33 return self._test_filehandle(convergence=None)
35 def test_filehandle_convergent_encryption(self):
36 return self._test_filehandle(convergence="some convergence string")
38 def _test_filehandle(self, convergence):
40 u = upload.FileHandle(s, convergence=convergence)
42 d.addCallback(self.failUnlessEqual, 41)
43 d.addCallback(lambda res: u.read(1))
44 d.addCallback(self.shouldEqual, "a")
45 d.addCallback(lambda res: u.read(80))
46 d.addCallback(self.shouldEqual, "a"*40)
47 d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
48 d.addCallback(lambda res: s.close()) # that privilege is reserved for us
51 def test_filename(self):
52 basedir = "upload/Uploadable/test_filename"
54 fn = os.path.join(basedir, "file")
58 u = upload.FileName(fn, convergence=None)
60 d.addCallback(self.failUnlessEqual, 41)
61 d.addCallback(lambda res: u.read(1))
62 d.addCallback(self.shouldEqual, "a")
63 d.addCallback(lambda res: u.read(80))
64 d.addCallback(self.shouldEqual, "a"*40)
65 d.addCallback(lambda res: u.close())
70 u = upload.Data(s, convergence=None)
72 d.addCallback(self.failUnlessEqual, 41)
73 d.addCallback(lambda res: u.read(1))
74 d.addCallback(self.shouldEqual, "a")
75 d.addCallback(lambda res: u.read(80))
76 d.addCallback(self.shouldEqual, "a"*40)
77 d.addCallback(lambda res: u.close())
80 class FakeStorageServer:
81 def __init__(self, mode):
85 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
86 { "maximum-immutable-share-size": 2**32 },
87 "application-version": str(allmydata.__full_version__),
90 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
91 { "maximum-immutable-share-size": 10 },
92 "application-version": str(allmydata.__full_version__),
96 def callRemote(self, methname, *args, **kwargs):
98 meth = getattr(self, methname)
99 return meth(*args, **kwargs)
100 d = eventual.fireEventually()
101 d.addCallback(lambda res: _call())
104 def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
105 sharenums, share_size, canary):
106 #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
108 if self.mode == "full":
110 elif self.mode == "already got them":
111 return (set(sharenums), {},)
113 for shnum in sharenums:
114 self.allocated.append( (storage_index, shnum) )
116 dict([( shnum, FakeBucketWriter(share_size) )
117 for shnum in sharenums]),
120 class FakeBucketWriter:
121 # a diagnostic version of storageserver.BucketWriter
122 def __init__(self, size):
123 self.data = StringIO()
127 def callRemote(self, methname, *args, **kwargs):
129 meth = getattr(self, "remote_" + methname)
130 return meth(*args, **kwargs)
131 d = eventual.fireEventually()
132 d.addCallback(lambda res: _call())
135 def remote_write(self, offset, data):
136 precondition(not self.closed)
137 precondition(offset >= 0)
138 precondition(offset+len(data) <= self._size,
139 "offset=%d + data=%d > size=%d" %
140 (offset, len(data), self._size))
141 self.data.seek(offset)
142 self.data.write(data)
144 def remote_close(self):
145 precondition(not self.closed)
148 def remote_abort(self):
149 log.err("uh oh, I was asked to abort")
152 DEFAULT_ENCODING_PARAMETERS = {"k":25,
155 "max_segment_size": 1*MiB,
157 def __init__(self, mode="good", num_servers=50):
159 self.num_servers = num_servers
160 if mode == "some_big_some_small":
162 for fakeid in range(num_servers):
164 self.peers.append( ("%20d" % fakeid,
165 FakeStorageServer("good")) )
167 self.peers.append( ("%20d" % fakeid,
168 FakeStorageServer("small")) )
170 self.peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
171 for fakeid in range(self.num_servers) ]
172 def log(self, *args, **kwargs):
174 def get_permuted_peers(self, storage_index, include_myself):
175 self.last_peers = [p[1] for p in self.peers]
177 def get_encoding_parameters(self):
178 return self.DEFAULT_ENCODING_PARAMETERS
180 def get_renewal_secret(self):
182 def get_cancel_secret(self):
185 class GiganticUploadable(upload.FileHandle):
186 def __init__(self, size):
190 def get_encryption_key(self):
191 return defer.succeed("\x00" * 16)
193 return defer.succeed(self._size)
194 def read(self, length):
195 left = self._size - self._fp
196 length = min(left, length)
198 if self._fp > 1000000:
199 # terminate the test early.
200 raise RuntimeError("we shouldn't be allowed to get this far")
201 return defer.succeed(["\x00" * length])
206 Once upon a time, there was a beautiful princess named Buttercup. She lived
207 in a magical land where every file was stored securely among millions of
208 machines, and nobody ever worried about their data being lost ever again.
211 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
215 SIZE_LARGE = len(DATA)
217 def upload_data(uploader, data):
218 u = upload.Data(data, convergence=None)
219 return uploader.upload(u)
220 def upload_filename(uploader, filename):
221 u = upload.FileName(filename, convergence=None)
222 return uploader.upload(u)
223 def upload_filehandle(uploader, fh):
224 u = upload.FileHandle(fh, convergence=None)
225 return uploader.upload(u)
227 class GoodServer(unittest.TestCase, ShouldFailMixin):
229 self.node = FakeClient(mode="good")
230 self.u = upload.Uploader()
231 self.u.running = True
232 self.u.parent = self.node
234 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
238 "max_segment_size": max_segsize,
240 self.node.DEFAULT_ENCODING_PARAMETERS = p
242 def _check_small(self, newuri, size):
244 self.failUnless(isinstance(u, uri.LiteralFileURI))
245 self.failUnlessEqual(len(u.data), size)
247 def _check_large(self, newuri, size):
249 self.failUnless(isinstance(u, uri.CHKFileURI))
250 self.failUnless(isinstance(u.storage_index, str))
251 self.failUnlessEqual(len(u.storage_index), 16)
252 self.failUnless(isinstance(u.key, str))
253 self.failUnlessEqual(len(u.key), 16)
254 self.failUnlessEqual(u.size, size)
256 def get_data(self, size):
259 def test_too_large(self):
260 # we've removed the 4GiB share size limit (see ticket #346 for
261 # details), but still have an 8-byte field, so the limit is now
262 # 2**64, so make sure we reject files larger than that.
263 k = 3; happy = 7; n = 10
264 self.set_encoding_parameters(k, happy, n)
266 data1 = GiganticUploadable(big)
267 d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
268 "This file is too large to be uploaded (data_size)",
269 self.u.upload, data1)
270 data2 = GiganticUploadable(big-3)
271 d.addCallback(lambda res:
272 self.shouldFail(FileTooLargeError,
273 "test_too_large-data2",
274 "This file is too large to be uploaded (offsets)",
275 self.u.upload, data2))
276 # I don't know where the actual limit is.. it depends upon how large
277 # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
280 def test_data_zero(self):
281 data = self.get_data(SIZE_ZERO)
282 d = upload_data(self.u, data)
283 d.addCallback(extract_uri)
284 d.addCallback(self._check_small, SIZE_ZERO)
287 def test_data_small(self):
288 data = self.get_data(SIZE_SMALL)
289 d = upload_data(self.u, data)
290 d.addCallback(extract_uri)
291 d.addCallback(self._check_small, SIZE_SMALL)
294 def test_data_large(self):
295 data = self.get_data(SIZE_LARGE)
296 d = upload_data(self.u, data)
297 d.addCallback(extract_uri)
298 d.addCallback(self._check_large, SIZE_LARGE)
301 def test_data_large_odd_segments(self):
302 data = self.get_data(SIZE_LARGE)
303 segsize = int(SIZE_LARGE / 2.5)
304 # we want 3 segments, since that's not a power of two
305 self.set_encoding_parameters(25, 75, 100, segsize)
306 d = upload_data(self.u, data)
307 d.addCallback(extract_uri)
308 d.addCallback(self._check_large, SIZE_LARGE)
311 def test_filehandle_zero(self):
312 data = self.get_data(SIZE_ZERO)
313 d = upload_filehandle(self.u, StringIO(data))
314 d.addCallback(extract_uri)
315 d.addCallback(self._check_small, SIZE_ZERO)
318 def test_filehandle_small(self):
319 data = self.get_data(SIZE_SMALL)
320 d = upload_filehandle(self.u, StringIO(data))
321 d.addCallback(extract_uri)
322 d.addCallback(self._check_small, SIZE_SMALL)
325 def test_filehandle_large(self):
326 data = self.get_data(SIZE_LARGE)
327 d = upload_filehandle(self.u, StringIO(data))
328 d.addCallback(extract_uri)
329 d.addCallback(self._check_large, SIZE_LARGE)
332 def test_filename_zero(self):
333 fn = "Uploader-test_filename_zero.data"
335 data = self.get_data(SIZE_ZERO)
338 d = upload_filename(self.u, fn)
339 d.addCallback(extract_uri)
340 d.addCallback(self._check_small, SIZE_ZERO)
343 def test_filename_small(self):
344 fn = "Uploader-test_filename_small.data"
346 data = self.get_data(SIZE_SMALL)
349 d = upload_filename(self.u, fn)
350 d.addCallback(extract_uri)
351 d.addCallback(self._check_small, SIZE_SMALL)
354 def test_filename_large(self):
355 fn = "Uploader-test_filename_large.data"
357 data = self.get_data(SIZE_LARGE)
360 d = upload_filename(self.u, fn)
361 d.addCallback(extract_uri)
362 d.addCallback(self._check_large, SIZE_LARGE)
365 class FullServer(unittest.TestCase):
367 self.node = FakeClient(mode="full")
368 self.u = upload.Uploader()
369 self.u.running = True
370 self.u.parent = self.node
372 def _should_fail(self, f):
373 self.failUnless(isinstance(f, Failure) and f.check(NotEnoughSharesError), f)
375 def test_data_large(self):
377 d = upload_data(self.u, data)
378 d.addBoth(self._should_fail)
381 class PeerSelection(unittest.TestCase):
383 def make_client(self, num_servers=50):
384 self.node = FakeClient(mode="good", num_servers=num_servers)
385 self.u = upload.Uploader()
386 self.u.running = True
387 self.u.parent = self.node
389 def get_data(self, size):
392 def _check_large(self, newuri, size):
394 self.failUnless(isinstance(u, uri.CHKFileURI))
395 self.failUnless(isinstance(u.storage_index, str))
396 self.failUnlessEqual(len(u.storage_index), 16)
397 self.failUnless(isinstance(u.key, str))
398 self.failUnlessEqual(len(u.key), 16)
399 self.failUnlessEqual(u.size, size)
401 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
405 "max_segment_size": max_segsize,
407 self.node.DEFAULT_ENCODING_PARAMETERS = p
409 def test_one_each(self):
410 # if we have 50 shares, and there are 50 peers, and they all accept a
411 # share, we should get exactly one share per peer
414 data = self.get_data(SIZE_LARGE)
415 self.set_encoding_parameters(25, 30, 50)
416 d = upload_data(self.u, data)
417 d.addCallback(extract_uri)
418 d.addCallback(self._check_large, SIZE_LARGE)
420 for p in self.node.last_peers:
421 allocated = p.allocated
422 self.failUnlessEqual(len(allocated), 1)
423 self.failUnlessEqual(p.queries, 1)
424 d.addCallback(_check)
427 def test_two_each(self):
428 # if we have 100 shares, and there are 50 peers, and they all accept
429 # all shares, we should get exactly two shares per peer
432 data = self.get_data(SIZE_LARGE)
433 self.set_encoding_parameters(50, 75, 100)
434 d = upload_data(self.u, data)
435 d.addCallback(extract_uri)
436 d.addCallback(self._check_large, SIZE_LARGE)
438 for p in self.node.last_peers:
439 allocated = p.allocated
440 self.failUnlessEqual(len(allocated), 2)
441 self.failUnlessEqual(p.queries, 2)
442 d.addCallback(_check)
445 def test_one_each_plus_one_extra(self):
446 # if we have 51 shares, and there are 50 peers, then one peer gets
447 # two shares and the rest get just one
450 data = self.get_data(SIZE_LARGE)
451 self.set_encoding_parameters(24, 41, 51)
452 d = upload_data(self.u, data)
453 d.addCallback(extract_uri)
454 d.addCallback(self._check_large, SIZE_LARGE)
458 for p in self.node.last_peers:
459 allocated = p.allocated
460 self.failUnless(len(allocated) in (1,2), len(allocated))
461 if len(allocated) == 1:
462 self.failUnlessEqual(p.queries, 1)
465 self.failUnlessEqual(p.queries, 2)
467 self.failUnlessEqual(len(got_one), 49)
468 self.failUnlessEqual(len(got_two), 1)
469 d.addCallback(_check)
472 def test_four_each(self):
473 # if we have 200 shares, and there are 50 peers, then each peer gets
474 # 4 shares. The design goal is to accomplish this with only two
478 data = self.get_data(SIZE_LARGE)
479 self.set_encoding_parameters(100, 150, 200)
480 d = upload_data(self.u, data)
481 d.addCallback(extract_uri)
482 d.addCallback(self._check_large, SIZE_LARGE)
484 for p in self.node.last_peers:
485 allocated = p.allocated
486 self.failUnlessEqual(len(allocated), 4)
487 self.failUnlessEqual(p.queries, 2)
488 d.addCallback(_check)
491 def test_three_of_ten(self):
492 # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
496 data = self.get_data(SIZE_LARGE)
497 self.set_encoding_parameters(3, 5, 10)
498 d = upload_data(self.u, data)
499 d.addCallback(extract_uri)
500 d.addCallback(self._check_large, SIZE_LARGE)
503 for p in self.node.last_peers:
504 allocated = p.allocated
505 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
506 histogram = [counts.get(i, 0) for i in range(5)]
507 self.failUnlessEqual(histogram, [0,0,0,2,1])
508 d.addCallback(_check)
511 def test_some_big_some_small(self):
512 # 10 shares, 20 servers, but half the servers don't support a
513 # share-size large enough for our file
514 self.node = FakeClient(mode="some_big_some_small", num_servers=20)
515 self.u = upload.Uploader()
516 self.u.running = True
517 self.u.parent = self.node
519 data = self.get_data(SIZE_LARGE)
520 self.set_encoding_parameters(3, 5, 10)
521 d = upload_data(self.u, data)
522 d.addCallback(extract_uri)
523 d.addCallback(self._check_large, SIZE_LARGE)
525 # we should have put one share each on the big peers, and zero
526 # shares on the small peers
528 for p in self.node.last_peers:
530 self.failUnlessEqual(len(p.allocated), 1)
531 elif p.mode == "small":
532 self.failUnlessEqual(len(p.allocated), 0)
533 total_allocated += len(p.allocated)
534 self.failUnlessEqual(total_allocated, 10)
535 d.addCallback(_check)
539 class StorageIndex(unittest.TestCase):
540 def test_params_must_matter(self):
541 DATA = "I am some data"
542 u = upload.Data(DATA, convergence="")
543 eu = upload.EncryptAnUploadable(u)
544 d1 = eu.get_storage_index()
546 # CHK means the same data should encrypt the same way
547 u = upload.Data(DATA, convergence="")
548 eu = upload.EncryptAnUploadable(u)
549 d1a = eu.get_storage_index()
551 # but if we use a different convergence string it should be different
552 u = upload.Data(DATA, convergence="wheee!")
553 eu = upload.EncryptAnUploadable(u)
554 d1salt1 = eu.get_storage_index()
556 # and if we add yet a different convergence it should be different again
557 u = upload.Data(DATA, convergence="NOT wheee!")
558 eu = upload.EncryptAnUploadable(u)
559 d1salt2 = eu.get_storage_index()
561 # and if we use the first string again it should be the same as last time
562 u = upload.Data(DATA, convergence="wheee!")
563 eu = upload.EncryptAnUploadable(u)
564 d1salt1a = eu.get_storage_index()
566 # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
567 u = upload.Data(DATA, convergence="")
568 u.encoding_param_k = u.default_encoding_param_k + 1
569 eu = upload.EncryptAnUploadable(u)
570 d2 = eu.get_storage_index()
572 # and if we use a random key, it should be different than the CHK
573 u = upload.Data(DATA, convergence=None)
574 eu = upload.EncryptAnUploadable(u)
575 d3 = eu.get_storage_index()
576 # and different from another instance
577 u = upload.Data(DATA, convergence=None)
578 eu = upload.EncryptAnUploadable(u)
579 d4 = eu.get_storage_index()
581 d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
583 si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
584 self.failUnlessEqual(si1, si1a)
585 self.failIfEqual(si1, si2)
586 self.failIfEqual(si1, si3)
587 self.failIfEqual(si1, si4)
588 self.failIfEqual(si3, si4)
589 self.failIfEqual(si1salt1, si1)
590 self.failIfEqual(si1salt1, si1salt2)
591 self.failIfEqual(si1salt2, si1)
592 self.failUnlessEqual(si1salt1, si1salt1a)
596 class EncodingParameters(SystemTestMixin, unittest.TestCase):
597 def test_configure_parameters(self):
598 self.basedir = self.mktemp()
600 u = upload.Data(DATA, convergence="")
601 d = self.set_up_nodes()
602 d.addCallback(lambda res: self.clients[0].upload(u))
603 d.addCallback(lambda ur: self.clients[0].create_node_from_uri(ur.uri))
604 m = monitor.Monitor()
605 d.addCallback(lambda fn: fn.check(m))
608 self.failUnlessEqual(data["count-shares-needed"], 7)
609 self.failUnlessEqual(data["count-shares-expected"], 12)
610 d.addCallback(_check)
613 def _set_up_nodes_extra_config(self):
614 f = open(os.path.join(self.getdir("client0"), "tahoe.cfg"), "wt")
616 f.write("[client]\n")
617 f.write("shares.needed = 7\n")
618 f.write("shares.total = 12\n")
623 # upload with exactly 75 peers (shares_of_happiness)
624 # have a download fail
625 # cancel a download (need to implement more cancel stuff)