3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.python.failure import Failure
6 from twisted.python import log
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor
12 from allmydata.immutable import upload
13 from allmydata.interfaces import IFileURI, FileTooLargeError, NotEnoughSharesError
14 from allmydata.util.assertutil import precondition
15 from allmydata.util.deferredutil import DeferredListShouldSucceed
16 from no_network import GridTestMixin
17 from common_util import ShouldFailMixin
18 from allmydata.storage_client import StorageFarmBroker
22 def extract_uri(results):
25 class Uploadable(unittest.TestCase):
26 def shouldEqual(self, data, expected):
27 self.failUnless(isinstance(data, list))
29 self.failUnless(isinstance(e, str))
31 self.failUnlessEqual(s, expected)
33 def test_filehandle_random_key(self):
34 return self._test_filehandle(convergence=None)
36 def test_filehandle_convergent_encryption(self):
37 return self._test_filehandle(convergence="some convergence string")
39 def _test_filehandle(self, convergence):
41 u = upload.FileHandle(s, convergence=convergence)
43 d.addCallback(self.failUnlessEqual, 41)
44 d.addCallback(lambda res: u.read(1))
45 d.addCallback(self.shouldEqual, "a")
46 d.addCallback(lambda res: u.read(80))
47 d.addCallback(self.shouldEqual, "a"*40)
48 d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
49 d.addCallback(lambda res: s.close()) # that privilege is reserved for us
52 def test_filename(self):
53 basedir = "upload/Uploadable/test_filename"
55 fn = os.path.join(basedir, "file")
59 u = upload.FileName(fn, convergence=None)
61 d.addCallback(self.failUnlessEqual, 41)
62 d.addCallback(lambda res: u.read(1))
63 d.addCallback(self.shouldEqual, "a")
64 d.addCallback(lambda res: u.read(80))
65 d.addCallback(self.shouldEqual, "a"*40)
66 d.addCallback(lambda res: u.close())
71 u = upload.Data(s, convergence=None)
73 d.addCallback(self.failUnlessEqual, 41)
74 d.addCallback(lambda res: u.read(1))
75 d.addCallback(self.shouldEqual, "a")
76 d.addCallback(lambda res: u.read(80))
77 d.addCallback(self.shouldEqual, "a"*40)
78 d.addCallback(lambda res: u.close())
81 class FakeStorageServer:
82 def __init__(self, mode):
86 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
87 { "maximum-immutable-share-size": 2**32 },
88 "application-version": str(allmydata.__full_version__),
91 self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
92 { "maximum-immutable-share-size": 10 },
93 "application-version": str(allmydata.__full_version__),
97 def callRemote(self, methname, *args, **kwargs):
99 meth = getattr(self, methname)
100 return meth(*args, **kwargs)
102 d.addCallback(lambda res: _call())
105 def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
106 sharenums, share_size, canary):
107 #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
109 if self.mode == "full":
111 elif self.mode == "already got them":
112 return (set(sharenums), {},)
114 for shnum in sharenums:
115 self.allocated.append( (storage_index, shnum) )
117 dict([( shnum, FakeBucketWriter(share_size) )
118 for shnum in sharenums]),
121 class FakeBucketWriter:
122 # a diagnostic version of storageserver.BucketWriter
123 def __init__(self, size):
124 self.data = StringIO()
128 def callRemote(self, methname, *args, **kwargs):
130 meth = getattr(self, "remote_" + methname)
131 return meth(*args, **kwargs)
133 d.addCallback(lambda res: _call())
136 def remote_write(self, offset, data):
137 precondition(not self.closed)
138 precondition(offset >= 0)
139 precondition(offset+len(data) <= self._size,
140 "offset=%d + data=%d > size=%d" %
141 (offset, len(data), self._size))
142 self.data.seek(offset)
143 self.data.write(data)
145 def remote_close(self):
146 precondition(not self.closed)
149 def remote_abort(self):
150 log.err("uh oh, I was asked to abort")
153 DEFAULT_ENCODING_PARAMETERS = {"k":25,
156 "max_segment_size": 1*MiB,
158 def __init__(self, mode="good", num_servers=50):
160 self.num_servers = num_servers
161 if mode == "some_big_some_small":
163 for fakeid in range(num_servers):
165 peers.append(("%20d" % fakeid, FakeStorageServer("good")))
167 peers.append(("%20d" % fakeid, FakeStorageServer("small")))
169 peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
170 for fakeid in range(self.num_servers) ]
171 self.storage_broker = StorageFarmBroker()
172 for (serverid, server) in peers:
173 self.storage_broker.add_server(serverid, server)
174 self.last_peers = [p[1] for p in peers]
176 def log(self, *args, **kwargs):
178 def get_encoding_parameters(self):
179 return self.DEFAULT_ENCODING_PARAMETERS
180 def get_storage_broker(self):
181 return self.storage_broker
183 def get_renewal_secret(self):
185 def get_cancel_secret(self):
188 class GotTooFarError(Exception):
191 class GiganticUploadable(upload.FileHandle):
192 def __init__(self, size):
196 def get_encryption_key(self):
197 return defer.succeed("\x00" * 16)
199 return defer.succeed(self._size)
200 def read(self, length):
201 left = self._size - self._fp
202 length = min(left, length)
204 if self._fp > 1000000:
205 # terminate the test early.
206 raise GotTooFarError("we shouldn't be allowed to get this far")
207 return defer.succeed(["\x00" * length])
212 Once upon a time, there was a beautiful princess named Buttercup. She lived
213 in a magical land where every file was stored securely among millions of
214 machines, and nobody ever worried about their data being lost ever again.
217 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
221 SIZE_LARGE = len(DATA)
223 def upload_data(uploader, data):
224 u = upload.Data(data, convergence=None)
225 return uploader.upload(u)
226 def upload_filename(uploader, filename):
227 u = upload.FileName(filename, convergence=None)
228 return uploader.upload(u)
229 def upload_filehandle(uploader, fh):
230 u = upload.FileHandle(fh, convergence=None)
231 return uploader.upload(u)
233 class GoodServer(unittest.TestCase, ShouldFailMixin):
235 self.node = FakeClient(mode="good")
236 self.u = upload.Uploader()
237 self.u.running = True
238 self.u.parent = self.node
240 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
244 "max_segment_size": max_segsize,
246 self.node.DEFAULT_ENCODING_PARAMETERS = p
248 def _check_small(self, newuri, size):
250 self.failUnless(isinstance(u, uri.LiteralFileURI))
251 self.failUnlessEqual(len(u.data), size)
253 def _check_large(self, newuri, size):
255 self.failUnless(isinstance(u, uri.CHKFileURI))
256 self.failUnless(isinstance(u.storage_index, str))
257 self.failUnlessEqual(len(u.storage_index), 16)
258 self.failUnless(isinstance(u.key, str))
259 self.failUnlessEqual(len(u.key), 16)
260 self.failUnlessEqual(u.size, size)
262 def get_data(self, size):
265 def test_too_large(self):
266 # we've removed the 4GiB share size limit (see ticket #346 for
267 # details), but still have an 8-byte field, so the limit is now
268 # 2**64, so make sure we reject files larger than that.
269 k = 3; happy = 7; n = 10
270 self.set_encoding_parameters(k, happy, n)
272 data1 = GiganticUploadable(big)
273 d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
274 "This file is too large to be uploaded (data_size)",
275 self.u.upload, data1)
276 data2 = GiganticUploadable(big-3)
277 d.addCallback(lambda res:
278 self.shouldFail(FileTooLargeError,
279 "test_too_large-data2",
280 "This file is too large to be uploaded (offsets)",
281 self.u.upload, data2))
282 # I don't know where the actual limit is.. it depends upon how large
283 # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
286 def test_data_zero(self):
287 data = self.get_data(SIZE_ZERO)
288 d = upload_data(self.u, data)
289 d.addCallback(extract_uri)
290 d.addCallback(self._check_small, SIZE_ZERO)
293 def test_data_small(self):
294 data = self.get_data(SIZE_SMALL)
295 d = upload_data(self.u, data)
296 d.addCallback(extract_uri)
297 d.addCallback(self._check_small, SIZE_SMALL)
300 def test_data_large(self):
301 data = self.get_data(SIZE_LARGE)
302 d = upload_data(self.u, data)
303 d.addCallback(extract_uri)
304 d.addCallback(self._check_large, SIZE_LARGE)
307 def test_data_large_odd_segments(self):
308 data = self.get_data(SIZE_LARGE)
309 segsize = int(SIZE_LARGE / 2.5)
310 # we want 3 segments, since that's not a power of two
311 self.set_encoding_parameters(25, 75, 100, segsize)
312 d = upload_data(self.u, data)
313 d.addCallback(extract_uri)
314 d.addCallback(self._check_large, SIZE_LARGE)
317 def test_filehandle_zero(self):
318 data = self.get_data(SIZE_ZERO)
319 d = upload_filehandle(self.u, StringIO(data))
320 d.addCallback(extract_uri)
321 d.addCallback(self._check_small, SIZE_ZERO)
324 def test_filehandle_small(self):
325 data = self.get_data(SIZE_SMALL)
326 d = upload_filehandle(self.u, StringIO(data))
327 d.addCallback(extract_uri)
328 d.addCallback(self._check_small, SIZE_SMALL)
331 def test_filehandle_large(self):
332 data = self.get_data(SIZE_LARGE)
333 d = upload_filehandle(self.u, StringIO(data))
334 d.addCallback(extract_uri)
335 d.addCallback(self._check_large, SIZE_LARGE)
338 def test_filename_zero(self):
339 fn = "Uploader-test_filename_zero.data"
341 data = self.get_data(SIZE_ZERO)
344 d = upload_filename(self.u, fn)
345 d.addCallback(extract_uri)
346 d.addCallback(self._check_small, SIZE_ZERO)
349 def test_filename_small(self):
350 fn = "Uploader-test_filename_small.data"
352 data = self.get_data(SIZE_SMALL)
355 d = upload_filename(self.u, fn)
356 d.addCallback(extract_uri)
357 d.addCallback(self._check_small, SIZE_SMALL)
360 def test_filename_large(self):
361 fn = "Uploader-test_filename_large.data"
363 data = self.get_data(SIZE_LARGE)
366 d = upload_filename(self.u, fn)
367 d.addCallback(extract_uri)
368 d.addCallback(self._check_large, SIZE_LARGE)
371 class FullServer(unittest.TestCase):
373 self.node = FakeClient(mode="full")
374 self.u = upload.Uploader()
375 self.u.running = True
376 self.u.parent = self.node
378 def _should_fail(self, f):
379 self.failUnless(isinstance(f, Failure) and f.check(NotEnoughSharesError), f)
381 def test_data_large(self):
383 d = upload_data(self.u, data)
384 d.addBoth(self._should_fail)
387 class PeerSelection(unittest.TestCase):
389 def make_client(self, num_servers=50):
390 self.node = FakeClient(mode="good", num_servers=num_servers)
391 self.u = upload.Uploader()
392 self.u.running = True
393 self.u.parent = self.node
395 def get_data(self, size):
398 def _check_large(self, newuri, size):
400 self.failUnless(isinstance(u, uri.CHKFileURI))
401 self.failUnless(isinstance(u.storage_index, str))
402 self.failUnlessEqual(len(u.storage_index), 16)
403 self.failUnless(isinstance(u.key, str))
404 self.failUnlessEqual(len(u.key), 16)
405 self.failUnlessEqual(u.size, size)
407 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
411 "max_segment_size": max_segsize,
413 self.node.DEFAULT_ENCODING_PARAMETERS = p
415 def test_one_each(self):
416 # if we have 50 shares, and there are 50 peers, and they all accept a
417 # share, we should get exactly one share per peer
420 data = self.get_data(SIZE_LARGE)
421 self.set_encoding_parameters(25, 30, 50)
422 d = upload_data(self.u, data)
423 d.addCallback(extract_uri)
424 d.addCallback(self._check_large, SIZE_LARGE)
426 for p in self.node.last_peers:
427 allocated = p.allocated
428 self.failUnlessEqual(len(allocated), 1)
429 self.failUnlessEqual(p.queries, 1)
430 d.addCallback(_check)
433 def test_two_each(self):
434 # if we have 100 shares, and there are 50 peers, and they all accept
435 # all shares, we should get exactly two shares per peer
438 data = self.get_data(SIZE_LARGE)
439 self.set_encoding_parameters(50, 75, 100)
440 d = upload_data(self.u, data)
441 d.addCallback(extract_uri)
442 d.addCallback(self._check_large, SIZE_LARGE)
444 for p in self.node.last_peers:
445 allocated = p.allocated
446 self.failUnlessEqual(len(allocated), 2)
447 self.failUnlessEqual(p.queries, 2)
448 d.addCallback(_check)
451 def test_one_each_plus_one_extra(self):
452 # if we have 51 shares, and there are 50 peers, then one peer gets
453 # two shares and the rest get just one
456 data = self.get_data(SIZE_LARGE)
457 self.set_encoding_parameters(24, 41, 51)
458 d = upload_data(self.u, data)
459 d.addCallback(extract_uri)
460 d.addCallback(self._check_large, SIZE_LARGE)
464 for p in self.node.last_peers:
465 allocated = p.allocated
466 self.failUnless(len(allocated) in (1,2), len(allocated))
467 if len(allocated) == 1:
468 self.failUnlessEqual(p.queries, 1)
471 self.failUnlessEqual(p.queries, 2)
473 self.failUnlessEqual(len(got_one), 49)
474 self.failUnlessEqual(len(got_two), 1)
475 d.addCallback(_check)
478 def test_four_each(self):
479 # if we have 200 shares, and there are 50 peers, then each peer gets
480 # 4 shares. The design goal is to accomplish this with only two
484 data = self.get_data(SIZE_LARGE)
485 self.set_encoding_parameters(100, 150, 200)
486 d = upload_data(self.u, data)
487 d.addCallback(extract_uri)
488 d.addCallback(self._check_large, SIZE_LARGE)
490 for p in self.node.last_peers:
491 allocated = p.allocated
492 self.failUnlessEqual(len(allocated), 4)
493 self.failUnlessEqual(p.queries, 2)
494 d.addCallback(_check)
497 def test_three_of_ten(self):
498 # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
502 data = self.get_data(SIZE_LARGE)
503 self.set_encoding_parameters(3, 5, 10)
504 d = upload_data(self.u, data)
505 d.addCallback(extract_uri)
506 d.addCallback(self._check_large, SIZE_LARGE)
509 for p in self.node.last_peers:
510 allocated = p.allocated
511 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
512 histogram = [counts.get(i, 0) for i in range(5)]
513 self.failUnlessEqual(histogram, [0,0,0,2,1])
514 d.addCallback(_check)
517 def test_some_big_some_small(self):
518 # 10 shares, 20 servers, but half the servers don't support a
519 # share-size large enough for our file
520 self.node = FakeClient(mode="some_big_some_small", num_servers=20)
521 self.u = upload.Uploader()
522 self.u.running = True
523 self.u.parent = self.node
525 data = self.get_data(SIZE_LARGE)
526 self.set_encoding_parameters(3, 5, 10)
527 d = upload_data(self.u, data)
528 d.addCallback(extract_uri)
529 d.addCallback(self._check_large, SIZE_LARGE)
531 # we should have put one share each on the big peers, and zero
532 # shares on the small peers
534 for p in self.node.last_peers:
536 self.failUnlessEqual(len(p.allocated), 1)
537 elif p.mode == "small":
538 self.failUnlessEqual(len(p.allocated), 0)
539 total_allocated += len(p.allocated)
540 self.failUnlessEqual(total_allocated, 10)
541 d.addCallback(_check)
545 class StorageIndex(unittest.TestCase):
546 def test_params_must_matter(self):
547 DATA = "I am some data"
548 u = upload.Data(DATA, convergence="")
549 eu = upload.EncryptAnUploadable(u)
550 d1 = eu.get_storage_index()
552 # CHK means the same data should encrypt the same way
553 u = upload.Data(DATA, convergence="")
554 eu = upload.EncryptAnUploadable(u)
555 d1a = eu.get_storage_index()
557 # but if we use a different convergence string it should be different
558 u = upload.Data(DATA, convergence="wheee!")
559 eu = upload.EncryptAnUploadable(u)
560 d1salt1 = eu.get_storage_index()
562 # and if we add yet a different convergence it should be different again
563 u = upload.Data(DATA, convergence="NOT wheee!")
564 eu = upload.EncryptAnUploadable(u)
565 d1salt2 = eu.get_storage_index()
567 # and if we use the first string again it should be the same as last time
568 u = upload.Data(DATA, convergence="wheee!")
569 eu = upload.EncryptAnUploadable(u)
570 d1salt1a = eu.get_storage_index()
572 # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
573 u = upload.Data(DATA, convergence="")
574 u.encoding_param_k = u.default_encoding_param_k + 1
575 eu = upload.EncryptAnUploadable(u)
576 d2 = eu.get_storage_index()
578 # and if we use a random key, it should be different than the CHK
579 u = upload.Data(DATA, convergence=None)
580 eu = upload.EncryptAnUploadable(u)
581 d3 = eu.get_storage_index()
582 # and different from another instance
583 u = upload.Data(DATA, convergence=None)
584 eu = upload.EncryptAnUploadable(u)
585 d4 = eu.get_storage_index()
587 d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
589 si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
590 self.failUnlessEqual(si1, si1a)
591 self.failIfEqual(si1, si2)
592 self.failIfEqual(si1, si3)
593 self.failIfEqual(si1, si4)
594 self.failIfEqual(si3, si4)
595 self.failIfEqual(si1salt1, si1)
596 self.failIfEqual(si1salt1, si1salt2)
597 self.failIfEqual(si1salt2, si1)
598 self.failUnlessEqual(si1salt1, si1salt1a)
602 class EncodingParameters(GridTestMixin, unittest.TestCase):
603 def test_configure_parameters(self):
604 self.basedir = self.mktemp()
605 hooks = {0: self._set_up_nodes_extra_config}
606 self.set_up_grid(client_config_hooks=hooks)
607 c0 = self.g.clients[0]
610 u = upload.Data(DATA, convergence="")
612 d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
613 m = monitor.Monitor()
614 d.addCallback(lambda fn: fn.check(m))
617 self.failUnlessEqual(data["count-shares-needed"], 7)
618 self.failUnlessEqual(data["count-shares-expected"], 12)
619 d.addCallback(_check)
622 def _set_up_nodes_extra_config(self, clientdir):
623 cfgfn = os.path.join(clientdir, "tahoe.cfg")
624 oldcfg = open(cfgfn, "r").read()
625 f = open(cfgfn, "wt")
628 f.write("[client]\n")
629 f.write("shares.needed = 7\n")
630 f.write("shares.total = 12\n")
636 # upload with exactly 75 peers (shares_of_happiness)
637 # have a download fail
638 # cancel a download (need to implement more cancel stuff)