3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.python.failure import Failure
6 from twisted.python import log
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor
12 from allmydata.immutable import upload
13 from allmydata.interfaces import IFileURI, FileTooLargeError, NotEnoughSharesError
14 from allmydata.util.assertutil import precondition
15 from allmydata.util.deferredutil import DeferredListShouldSucceed
16 from no_network import GridTestMixin
17 from common_util import ShouldFailMixin
18 from allmydata.storage_client import StorageFarmBroker
22 def extract_uri(results):
25 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
26 # have been due to an earlier test ERROR'ing out due to timeout, which seems
27 # to screw up subsequent tests.
30 class Uploadable(unittest.TestCase):
31 def shouldEqual(self, data, expected):
32 self.failUnless(isinstance(data, list))
34 self.failUnless(isinstance(e, str))
36 self.failUnlessEqual(s, expected)
38 def test_filehandle_random_key(self):
39 return self._test_filehandle(convergence=None)
41 def test_filehandle_convergent_encryption(self):
42 return self._test_filehandle(convergence="some convergence string")
44 def _test_filehandle(self, convergence):
46 u = upload.FileHandle(s, convergence=convergence)
48 d.addCallback(self.failUnlessEqual, 41)
49 d.addCallback(lambda res: u.read(1))
50 d.addCallback(self.shouldEqual, "a")
51 d.addCallback(lambda res: u.read(80))
52 d.addCallback(self.shouldEqual, "a"*40)
53 d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
54 d.addCallback(lambda res: s.close()) # that privilege is reserved for us
57 def test_filename(self):
58 basedir = "upload/Uploadable/test_filename"
60 fn = os.path.join(basedir, "file")
64 u = upload.FileName(fn, convergence=None)
66 d.addCallback(self.failUnlessEqual, 41)
67 d.addCallback(lambda res: u.read(1))
68 d.addCallback(self.shouldEqual, "a")
69 d.addCallback(lambda res: u.read(80))
70 d.addCallback(self.shouldEqual, "a"*40)
71 d.addCallback(lambda res: u.close())
76 u = upload.Data(s, convergence=None)
78 d.addCallback(self.failUnlessEqual, 41)
79 d.addCallback(lambda res: u.read(1))
80 d.addCallback(self.shouldEqual, "a")
81 d.addCallback(lambda res: u.read(80))
82 d.addCallback(self.shouldEqual, "a"*40)
83 d.addCallback(lambda res: u.close())
86 class FakeStorageServer:
87 def __init__(self, mode):
91 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
92 { "maximum-immutable-share-size": 2**32 },
93 "application-version": str(allmydata.__full_version__),
96 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
97 { "maximum-immutable-share-size": 10 },
98 "application-version": str(allmydata.__full_version__),
102 def callRemote(self, methname, *args, **kwargs):
104 meth = getattr(self, methname)
105 return meth(*args, **kwargs)
107 d.addCallback(lambda res: _call())
110 def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
111 sharenums, share_size, canary):
112 #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
114 if self.mode == "full":
116 elif self.mode == "already got them":
117 return (set(sharenums), {},)
119 for shnum in sharenums:
120 self.allocated.append( (storage_index, shnum) )
122 dict([( shnum, FakeBucketWriter(share_size) )
123 for shnum in sharenums]),
126 class FakeBucketWriter:
127 # a diagnostic version of storageserver.BucketWriter
128 def __init__(self, size):
129 self.data = StringIO()
133 def callRemote(self, methname, *args, **kwargs):
135 meth = getattr(self, "remote_" + methname)
136 return meth(*args, **kwargs)
138 d.addCallback(lambda res: _call())
141 def remote_write(self, offset, data):
142 precondition(not self.closed)
143 precondition(offset >= 0)
144 precondition(offset+len(data) <= self._size,
145 "offset=%d + data=%d > size=%d" %
146 (offset, len(data), self._size))
147 self.data.seek(offset)
148 self.data.write(data)
150 def remote_close(self):
151 precondition(not self.closed)
154 def remote_abort(self):
155 log.err("uh oh, I was asked to abort")
158 DEFAULT_ENCODING_PARAMETERS = {"k":25,
161 "max_segment_size": 1*MiB,
163 def __init__(self, mode="good", num_servers=50):
165 self.num_servers = num_servers
166 if mode == "some_big_some_small":
168 for fakeid in range(num_servers):
170 peers.append(("%20d" % fakeid, FakeStorageServer("good")))
172 peers.append(("%20d" % fakeid, FakeStorageServer("small")))
174 peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
175 for fakeid in range(self.num_servers) ]
176 self.storage_broker = StorageFarmBroker()
177 for (serverid, server) in peers:
178 self.storage_broker.add_server(serverid, server)
179 self.last_peers = [p[1] for p in peers]
181 def log(self, *args, **kwargs):
183 def get_encoding_parameters(self):
184 return self.DEFAULT_ENCODING_PARAMETERS
185 def get_storage_broker(self):
186 return self.storage_broker
188 def get_renewal_secret(self):
190 def get_cancel_secret(self):
193 class GotTooFarError(Exception):
196 class GiganticUploadable(upload.FileHandle):
197 def __init__(self, size):
201 def get_encryption_key(self):
202 return defer.succeed("\x00" * 16)
204 return defer.succeed(self._size)
205 def read(self, length):
206 left = self._size - self._fp
207 length = min(left, length)
209 if self._fp > 1000000:
210 # terminate the test early.
211 raise GotTooFarError("we shouldn't be allowed to get this far")
212 return defer.succeed(["\x00" * length])
217 Once upon a time, there was a beautiful princess named Buttercup. She lived
218 in a magical land where every file was stored securely among millions of
219 machines, and nobody ever worried about their data being lost ever again.
222 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
226 SIZE_LARGE = len(DATA)
228 def upload_data(uploader, data):
229 u = upload.Data(data, convergence=None)
230 return uploader.upload(u)
231 def upload_filename(uploader, filename):
232 u = upload.FileName(filename, convergence=None)
233 return uploader.upload(u)
234 def upload_filehandle(uploader, fh):
235 u = upload.FileHandle(fh, convergence=None)
236 return uploader.upload(u)
238 class GoodServer(unittest.TestCase, ShouldFailMixin):
240 self.node = FakeClient(mode="good")
241 self.u = upload.Uploader()
242 self.u.running = True
243 self.u.parent = self.node
245 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
249 "max_segment_size": max_segsize,
251 self.node.DEFAULT_ENCODING_PARAMETERS = p
253 def _check_small(self, newuri, size):
255 self.failUnless(isinstance(u, uri.LiteralFileURI))
256 self.failUnlessEqual(len(u.data), size)
258 def _check_large(self, newuri, size):
260 self.failUnless(isinstance(u, uri.CHKFileURI))
261 self.failUnless(isinstance(u.storage_index, str))
262 self.failUnlessEqual(len(u.storage_index), 16)
263 self.failUnless(isinstance(u.key, str))
264 self.failUnlessEqual(len(u.key), 16)
265 self.failUnlessEqual(u.size, size)
267 def get_data(self, size):
270 def test_too_large(self):
271 # we've removed the 4GiB share size limit (see ticket #346 for
272 # details), but still have an 8-byte field, so the limit is now
273 # 2**64, so make sure we reject files larger than that.
274 k = 3; happy = 7; n = 10
275 self.set_encoding_parameters(k, happy, n)
277 data1 = GiganticUploadable(big)
278 d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
279 "This file is too large to be uploaded (data_size)",
280 self.u.upload, data1)
281 data2 = GiganticUploadable(big-3)
282 d.addCallback(lambda res:
283 self.shouldFail(FileTooLargeError,
284 "test_too_large-data2",
285 "This file is too large to be uploaded (offsets)",
286 self.u.upload, data2))
287 # I don't know where the actual limit is.. it depends upon how large
288 # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
291 def test_data_zero(self):
292 data = self.get_data(SIZE_ZERO)
293 d = upload_data(self.u, data)
294 d.addCallback(extract_uri)
295 d.addCallback(self._check_small, SIZE_ZERO)
298 def test_data_small(self):
299 data = self.get_data(SIZE_SMALL)
300 d = upload_data(self.u, data)
301 d.addCallback(extract_uri)
302 d.addCallback(self._check_small, SIZE_SMALL)
305 def test_data_large(self):
306 data = self.get_data(SIZE_LARGE)
307 d = upload_data(self.u, data)
308 d.addCallback(extract_uri)
309 d.addCallback(self._check_large, SIZE_LARGE)
312 def test_data_large_odd_segments(self):
313 data = self.get_data(SIZE_LARGE)
314 segsize = int(SIZE_LARGE / 2.5)
315 # we want 3 segments, since that's not a power of two
316 self.set_encoding_parameters(25, 75, 100, segsize)
317 d = upload_data(self.u, data)
318 d.addCallback(extract_uri)
319 d.addCallback(self._check_large, SIZE_LARGE)
322 def test_filehandle_zero(self):
323 data = self.get_data(SIZE_ZERO)
324 d = upload_filehandle(self.u, StringIO(data))
325 d.addCallback(extract_uri)
326 d.addCallback(self._check_small, SIZE_ZERO)
329 def test_filehandle_small(self):
330 data = self.get_data(SIZE_SMALL)
331 d = upload_filehandle(self.u, StringIO(data))
332 d.addCallback(extract_uri)
333 d.addCallback(self._check_small, SIZE_SMALL)
336 def test_filehandle_large(self):
337 data = self.get_data(SIZE_LARGE)
338 d = upload_filehandle(self.u, StringIO(data))
339 d.addCallback(extract_uri)
340 d.addCallback(self._check_large, SIZE_LARGE)
343 def test_filename_zero(self):
344 fn = "Uploader-test_filename_zero.data"
346 data = self.get_data(SIZE_ZERO)
349 d = upload_filename(self.u, fn)
350 d.addCallback(extract_uri)
351 d.addCallback(self._check_small, SIZE_ZERO)
354 def test_filename_small(self):
355 fn = "Uploader-test_filename_small.data"
357 data = self.get_data(SIZE_SMALL)
360 d = upload_filename(self.u, fn)
361 d.addCallback(extract_uri)
362 d.addCallback(self._check_small, SIZE_SMALL)
365 def test_filename_large(self):
366 fn = "Uploader-test_filename_large.data"
368 data = self.get_data(SIZE_LARGE)
371 d = upload_filename(self.u, fn)
372 d.addCallback(extract_uri)
373 d.addCallback(self._check_large, SIZE_LARGE)
376 class FullServer(unittest.TestCase):
378 self.node = FakeClient(mode="full")
379 self.u = upload.Uploader()
380 self.u.running = True
381 self.u.parent = self.node
383 def _should_fail(self, f):
384 self.failUnless(isinstance(f, Failure) and f.check(NotEnoughSharesError), f)
386 def test_data_large(self):
388 d = upload_data(self.u, data)
389 d.addBoth(self._should_fail)
392 class PeerSelection(unittest.TestCase):
394 def make_client(self, num_servers=50):
395 self.node = FakeClient(mode="good", num_servers=num_servers)
396 self.u = upload.Uploader()
397 self.u.running = True
398 self.u.parent = self.node
400 def get_data(self, size):
403 def _check_large(self, newuri, size):
405 self.failUnless(isinstance(u, uri.CHKFileURI))
406 self.failUnless(isinstance(u.storage_index, str))
407 self.failUnlessEqual(len(u.storage_index), 16)
408 self.failUnless(isinstance(u.key, str))
409 self.failUnlessEqual(len(u.key), 16)
410 self.failUnlessEqual(u.size, size)
412 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
416 "max_segment_size": max_segsize,
418 self.node.DEFAULT_ENCODING_PARAMETERS = p
420 def test_one_each(self):
421 # if we have 50 shares, and there are 50 peers, and they all accept a
422 # share, we should get exactly one share per peer
425 data = self.get_data(SIZE_LARGE)
426 self.set_encoding_parameters(25, 30, 50)
427 d = upload_data(self.u, data)
428 d.addCallback(extract_uri)
429 d.addCallback(self._check_large, SIZE_LARGE)
431 for p in self.node.last_peers:
432 allocated = p.allocated
433 self.failUnlessEqual(len(allocated), 1)
434 self.failUnlessEqual(p.queries, 1)
435 d.addCallback(_check)
438 def test_two_each(self):
439 # if we have 100 shares, and there are 50 peers, and they all accept
440 # all shares, we should get exactly two shares per peer
443 data = self.get_data(SIZE_LARGE)
444 self.set_encoding_parameters(50, 75, 100)
445 d = upload_data(self.u, data)
446 d.addCallback(extract_uri)
447 d.addCallback(self._check_large, SIZE_LARGE)
449 for p in self.node.last_peers:
450 allocated = p.allocated
451 self.failUnlessEqual(len(allocated), 2)
452 self.failUnlessEqual(p.queries, 2)
453 d.addCallback(_check)
456 def test_one_each_plus_one_extra(self):
457 # if we have 51 shares, and there are 50 peers, then one peer gets
458 # two shares and the rest get just one
461 data = self.get_data(SIZE_LARGE)
462 self.set_encoding_parameters(24, 41, 51)
463 d = upload_data(self.u, data)
464 d.addCallback(extract_uri)
465 d.addCallback(self._check_large, SIZE_LARGE)
469 for p in self.node.last_peers:
470 allocated = p.allocated
471 self.failUnless(len(allocated) in (1,2), len(allocated))
472 if len(allocated) == 1:
473 self.failUnlessEqual(p.queries, 1)
476 self.failUnlessEqual(p.queries, 2)
478 self.failUnlessEqual(len(got_one), 49)
479 self.failUnlessEqual(len(got_two), 1)
480 d.addCallback(_check)
483 def test_four_each(self):
484 # if we have 200 shares, and there are 50 peers, then each peer gets
485 # 4 shares. The design goal is to accomplish this with only two
489 data = self.get_data(SIZE_LARGE)
490 self.set_encoding_parameters(100, 150, 200)
491 d = upload_data(self.u, data)
492 d.addCallback(extract_uri)
493 d.addCallback(self._check_large, SIZE_LARGE)
495 for p in self.node.last_peers:
496 allocated = p.allocated
497 self.failUnlessEqual(len(allocated), 4)
498 self.failUnlessEqual(p.queries, 2)
499 d.addCallback(_check)
502 def test_three_of_ten(self):
503 # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
507 data = self.get_data(SIZE_LARGE)
508 self.set_encoding_parameters(3, 5, 10)
509 d = upload_data(self.u, data)
510 d.addCallback(extract_uri)
511 d.addCallback(self._check_large, SIZE_LARGE)
514 for p in self.node.last_peers:
515 allocated = p.allocated
516 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
517 histogram = [counts.get(i, 0) for i in range(5)]
518 self.failUnlessEqual(histogram, [0,0,0,2,1])
519 d.addCallback(_check)
522 def test_some_big_some_small(self):
523 # 10 shares, 20 servers, but half the servers don't support a
524 # share-size large enough for our file
525 self.node = FakeClient(mode="some_big_some_small", num_servers=20)
526 self.u = upload.Uploader()
527 self.u.running = True
528 self.u.parent = self.node
530 data = self.get_data(SIZE_LARGE)
531 self.set_encoding_parameters(3, 5, 10)
532 d = upload_data(self.u, data)
533 d.addCallback(extract_uri)
534 d.addCallback(self._check_large, SIZE_LARGE)
536 # we should have put one share each on the big peers, and zero
537 # shares on the small peers
539 for p in self.node.last_peers:
541 self.failUnlessEqual(len(p.allocated), 1)
542 elif p.mode == "small":
543 self.failUnlessEqual(len(p.allocated), 0)
544 total_allocated += len(p.allocated)
545 self.failUnlessEqual(total_allocated, 10)
546 d.addCallback(_check)
550 class StorageIndex(unittest.TestCase):
551 def test_params_must_matter(self):
552 DATA = "I am some data"
553 u = upload.Data(DATA, convergence="")
554 eu = upload.EncryptAnUploadable(u)
555 d1 = eu.get_storage_index()
557 # CHK means the same data should encrypt the same way
558 u = upload.Data(DATA, convergence="")
559 eu = upload.EncryptAnUploadable(u)
560 d1a = eu.get_storage_index()
562 # but if we use a different convergence string it should be different
563 u = upload.Data(DATA, convergence="wheee!")
564 eu = upload.EncryptAnUploadable(u)
565 d1salt1 = eu.get_storage_index()
567 # and if we add yet a different convergence it should be different again
568 u = upload.Data(DATA, convergence="NOT wheee!")
569 eu = upload.EncryptAnUploadable(u)
570 d1salt2 = eu.get_storage_index()
572 # and if we use the first string again it should be the same as last time
573 u = upload.Data(DATA, convergence="wheee!")
574 eu = upload.EncryptAnUploadable(u)
575 d1salt1a = eu.get_storage_index()
577 # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
578 u = upload.Data(DATA, convergence="")
579 u.encoding_param_k = u.default_encoding_param_k + 1
580 eu = upload.EncryptAnUploadable(u)
581 d2 = eu.get_storage_index()
583 # and if we use a random key, it should be different than the CHK
584 u = upload.Data(DATA, convergence=None)
585 eu = upload.EncryptAnUploadable(u)
586 d3 = eu.get_storage_index()
587 # and different from another instance
588 u = upload.Data(DATA, convergence=None)
589 eu = upload.EncryptAnUploadable(u)
590 d4 = eu.get_storage_index()
592 d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
594 si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
595 self.failUnlessEqual(si1, si1a)
596 self.failIfEqual(si1, si2)
597 self.failIfEqual(si1, si3)
598 self.failIfEqual(si1, si4)
599 self.failIfEqual(si3, si4)
600 self.failIfEqual(si1salt1, si1)
601 self.failIfEqual(si1salt1, si1salt2)
602 self.failIfEqual(si1salt2, si1)
603 self.failUnlessEqual(si1salt1, si1salt1a)
607 class EncodingParameters(GridTestMixin, unittest.TestCase):
608 def test_configure_parameters(self):
609 self.basedir = self.mktemp()
610 hooks = {0: self._set_up_nodes_extra_config}
611 self.set_up_grid(client_config_hooks=hooks)
612 c0 = self.g.clients[0]
615 u = upload.Data(DATA, convergence="")
617 d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
618 m = monitor.Monitor()
619 d.addCallback(lambda fn: fn.check(m))
622 self.failUnlessEqual(data["count-shares-needed"], 7)
623 self.failUnlessEqual(data["count-shares-expected"], 12)
624 d.addCallback(_check)
627 def _set_up_nodes_extra_config(self, clientdir):
628 cfgfn = os.path.join(clientdir, "tahoe.cfg")
629 oldcfg = open(cfgfn, "r").read()
630 f = open(cfgfn, "wt")
633 f.write("[client]\n")
634 f.write("shares.needed = 7\n")
635 f.write("shares.total = 12\n")
641 # upload with exactly 75 peers (shares_of_happiness)
642 # have a download fail
643 # cancel a download (need to implement more cancel stuff)