3 from twisted.trial import unittest
4 from twisted.python.failure import Failure
5 from twisted.python import log
6 from twisted.internet import defer
7 from cStringIO import StringIO
9 from allmydata import uri
10 from allmydata.immutable import upload, encode
11 from allmydata.interfaces import IFileURI, FileTooLargeError
12 from allmydata.util.assertutil import precondition
13 from allmydata.util.deferredutil import DeferredListShouldSucceed
14 from allmydata.util.testutil import ShouldFailMixin
15 from foolscap import eventual
19 def extract_uri(results):
22 class Uploadable(unittest.TestCase):
23 def shouldEqual(self, data, expected):
24 self.failUnless(isinstance(data, list))
26 self.failUnless(isinstance(e, str))
28 self.failUnlessEqual(s, expected)
30 def test_filehandle_random_key(self):
31 return self._test_filehandle(convergence=None)
33 def test_filehandle_convergent_encryption(self):
34 return self._test_filehandle(convergence="some convergence string")
36 def _test_filehandle(self, convergence):
38 u = upload.FileHandle(s, convergence=convergence)
40 d.addCallback(self.failUnlessEqual, 41)
41 d.addCallback(lambda res: u.read(1))
42 d.addCallback(self.shouldEqual, "a")
43 d.addCallback(lambda res: u.read(80))
44 d.addCallback(self.shouldEqual, "a"*40)
45 d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
46 d.addCallback(lambda res: s.close()) # that privilege is reserved for us
49 def test_filename(self):
50 basedir = "upload/Uploadable/test_filename"
52 fn = os.path.join(basedir, "file")
56 u = upload.FileName(fn, convergence=None)
58 d.addCallback(self.failUnlessEqual, 41)
59 d.addCallback(lambda res: u.read(1))
60 d.addCallback(self.shouldEqual, "a")
61 d.addCallback(lambda res: u.read(80))
62 d.addCallback(self.shouldEqual, "a"*40)
63 d.addCallback(lambda res: u.close())
68 u = upload.Data(s, convergence=None)
70 d.addCallback(self.failUnlessEqual, 41)
71 d.addCallback(lambda res: u.read(1))
72 d.addCallback(self.shouldEqual, "a")
73 d.addCallback(lambda res: u.read(80))
74 d.addCallback(self.shouldEqual, "a"*40)
75 d.addCallback(lambda res: u.close())
78 class FakeStorageServer:
79 def __init__(self, mode):
83 def callRemote(self, methname, *args, **kwargs):
85 meth = getattr(self, methname)
86 return meth(*args, **kwargs)
87 d = eventual.fireEventually()
88 d.addCallback(lambda res: _call())
91 def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
92 sharenums, share_size, canary):
93 #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
95 if self.mode == "full":
97 elif self.mode == "already got them":
98 return (set(sharenums), {},)
100 for shnum in sharenums:
101 self.allocated.append( (storage_index, shnum) )
103 dict([( shnum, FakeBucketWriter(share_size) )
104 for shnum in sharenums]),
107 class FakeBucketWriter:
108 # a diagnostic version of storageserver.BucketWriter
109 def __init__(self, size):
110 self.data = StringIO()
114 def callRemote(self, methname, *args, **kwargs):
116 meth = getattr(self, "remote_" + methname)
117 return meth(*args, **kwargs)
118 d = eventual.fireEventually()
119 d.addCallback(lambda res: _call())
122 def remote_write(self, offset, data):
123 precondition(not self.closed)
124 precondition(offset >= 0)
125 precondition(offset+len(data) <= self._size,
126 "offset=%d + data=%d > size=%d" %
127 (offset, len(data), self._size))
128 self.data.seek(offset)
129 self.data.write(data)
131 def remote_close(self):
132 precondition(not self.closed)
135 def remote_abort(self):
136 log.err("uh oh, I was asked to abort")
139 DEFAULT_ENCODING_PARAMETERS = {"k":25,
142 "max_segment_size": 1*MiB,
144 def __init__(self, mode="good", num_servers=50):
146 self.num_servers = num_servers
147 def log(self, *args, **kwargs):
149 def get_permuted_peers(self, storage_index, include_myself):
150 peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
151 for fakeid in range(self.num_servers) ]
152 self.last_peers = [p[1] for p in peers]
154 def get_encoding_parameters(self):
155 return self.DEFAULT_ENCODING_PARAMETERS
157 def get_renewal_secret(self):
159 def get_cancel_secret(self):
162 class GiganticUploadable(upload.FileHandle):
163 def __init__(self, size):
167 def get_encryption_key(self):
168 return defer.succeed("\x00" * 16)
170 return defer.succeed(self._size)
171 def read(self, length):
172 left = self._size - self._fp
173 length = min(left, length)
175 if self._fp > 1000000:
176 # terminate the test early.
177 raise RuntimeError("we shouldn't be allowed to get this far")
178 return defer.succeed(["\x00" * length])
183 Once upon a time, there was a beautiful princess named Buttercup. She lived
184 in a magical land where every file was stored securely among millions of
185 machines, and nobody ever worried about their data being lost ever again.
188 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
192 SIZE_LARGE = len(DATA)
194 def upload_data(uploader, data):
195 u = upload.Data(data, convergence=None)
196 return uploader.upload(u)
197 def upload_filename(uploader, filename):
198 u = upload.FileName(filename, convergence=None)
199 return uploader.upload(u)
200 def upload_filehandle(uploader, fh):
201 u = upload.FileHandle(fh, convergence=None)
202 return uploader.upload(u)
204 class GoodServer(unittest.TestCase, ShouldFailMixin):
206 self.node = FakeClient(mode="good")
207 self.u = upload.Uploader()
208 self.u.running = True
209 self.u.parent = self.node
211 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
215 "max_segment_size": max_segsize,
217 self.node.DEFAULT_ENCODING_PARAMETERS = p
219 def _check_small(self, newuri, size):
221 self.failUnless(isinstance(u, uri.LiteralFileURI))
222 self.failUnlessEqual(len(u.data), size)
224 def _check_large(self, newuri, size):
226 self.failUnless(isinstance(u, uri.CHKFileURI))
227 self.failUnless(isinstance(u.storage_index, str))
228 self.failUnlessEqual(len(u.storage_index), 16)
229 self.failUnless(isinstance(u.key, str))
230 self.failUnlessEqual(len(u.key), 16)
231 self.failUnlessEqual(u.size, size)
233 def get_data(self, size):
236 def test_too_large(self):
237 # we currently impose a sizelimit on uploaded files, because of
238 # limitations in the share format (see ticket #346 for details). The
239 # limit is set to ensure that no share is larger than 4GiB. Make sure
240 # that we reject files larger than that.
241 k = 3; happy = 7; n = 10
242 self.set_encoding_parameters(k, happy, n)
243 data1 = GiganticUploadable(k*4*1024*1024*1024)
244 d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
245 "This file is too large to be uploaded (data_size)",
246 self.u.upload, data1)
247 data2 = GiganticUploadable(k*4*1024*1024*1024-3)
248 d.addCallback(lambda res:
249 self.shouldFail(FileTooLargeError,
250 "test_too_large-data2",
251 "This file is too large to be uploaded (offsets)",
252 self.u.upload, data2))
253 # I don't know where the actual limit is.. it depends upon how large
254 # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
257 def test_data_zero(self):
258 data = self.get_data(SIZE_ZERO)
259 d = upload_data(self.u, data)
260 d.addCallback(extract_uri)
261 d.addCallback(self._check_small, SIZE_ZERO)
264 def test_data_small(self):
265 data = self.get_data(SIZE_SMALL)
266 d = upload_data(self.u, data)
267 d.addCallback(extract_uri)
268 d.addCallback(self._check_small, SIZE_SMALL)
271 def test_data_large(self):
272 data = self.get_data(SIZE_LARGE)
273 d = upload_data(self.u, data)
274 d.addCallback(extract_uri)
275 d.addCallback(self._check_large, SIZE_LARGE)
278 def test_data_large_odd_segments(self):
279 data = self.get_data(SIZE_LARGE)
280 segsize = int(SIZE_LARGE / 2.5)
281 # we want 3 segments, since that's not a power of two
282 self.set_encoding_parameters(25, 75, 100, segsize)
283 d = upload_data(self.u, data)
284 d.addCallback(extract_uri)
285 d.addCallback(self._check_large, SIZE_LARGE)
288 def test_filehandle_zero(self):
289 data = self.get_data(SIZE_ZERO)
290 d = upload_filehandle(self.u, StringIO(data))
291 d.addCallback(extract_uri)
292 d.addCallback(self._check_small, SIZE_ZERO)
295 def test_filehandle_small(self):
296 data = self.get_data(SIZE_SMALL)
297 d = upload_filehandle(self.u, StringIO(data))
298 d.addCallback(extract_uri)
299 d.addCallback(self._check_small, SIZE_SMALL)
302 def test_filehandle_large(self):
303 data = self.get_data(SIZE_LARGE)
304 d = upload_filehandle(self.u, StringIO(data))
305 d.addCallback(extract_uri)
306 d.addCallback(self._check_large, SIZE_LARGE)
309 def test_filename_zero(self):
310 fn = "Uploader-test_filename_zero.data"
312 data = self.get_data(SIZE_ZERO)
315 d = upload_filename(self.u, fn)
316 d.addCallback(extract_uri)
317 d.addCallback(self._check_small, SIZE_ZERO)
320 def test_filename_small(self):
321 fn = "Uploader-test_filename_small.data"
323 data = self.get_data(SIZE_SMALL)
326 d = upload_filename(self.u, fn)
327 d.addCallback(extract_uri)
328 d.addCallback(self._check_small, SIZE_SMALL)
331 def test_filename_large(self):
332 fn = "Uploader-test_filename_large.data"
334 data = self.get_data(SIZE_LARGE)
337 d = upload_filename(self.u, fn)
338 d.addCallback(extract_uri)
339 d.addCallback(self._check_large, SIZE_LARGE)
342 class FullServer(unittest.TestCase):
344 self.node = FakeClient(mode="full")
345 self.u = upload.Uploader()
346 self.u.running = True
347 self.u.parent = self.node
349 def _should_fail(self, f):
350 self.failUnless(isinstance(f, Failure) and f.check(encode.NotEnoughSharesError), f)
352 def test_data_large(self):
354 d = upload_data(self.u, data)
355 d.addBoth(self._should_fail)
358 class PeerSelection(unittest.TestCase):
360 def make_client(self, num_servers=50):
361 self.node = FakeClient(mode="good", num_servers=num_servers)
362 self.u = upload.Uploader()
363 self.u.running = True
364 self.u.parent = self.node
366 def get_data(self, size):
369 def _check_large(self, newuri, size):
371 self.failUnless(isinstance(u, uri.CHKFileURI))
372 self.failUnless(isinstance(u.storage_index, str))
373 self.failUnlessEqual(len(u.storage_index), 16)
374 self.failUnless(isinstance(u.key, str))
375 self.failUnlessEqual(len(u.key), 16)
376 self.failUnlessEqual(u.size, size)
378 def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
382 "max_segment_size": max_segsize,
384 self.node.DEFAULT_ENCODING_PARAMETERS = p
386 def test_one_each(self):
387 # if we have 50 shares, and there are 50 peers, and they all accept a
388 # share, we should get exactly one share per peer
391 data = self.get_data(SIZE_LARGE)
392 self.set_encoding_parameters(25, 30, 50)
393 d = upload_data(self.u, data)
394 d.addCallback(extract_uri)
395 d.addCallback(self._check_large, SIZE_LARGE)
397 for p in self.node.last_peers:
398 allocated = p.allocated
399 self.failUnlessEqual(len(allocated), 1)
400 self.failUnlessEqual(p.queries, 1)
401 d.addCallback(_check)
404 def test_two_each(self):
405 # if we have 100 shares, and there are 50 peers, and they all accept
406 # all shares, we should get exactly two shares per peer
409 data = self.get_data(SIZE_LARGE)
410 self.set_encoding_parameters(50, 75, 100)
411 d = upload_data(self.u, data)
412 d.addCallback(extract_uri)
413 d.addCallback(self._check_large, SIZE_LARGE)
415 for p in self.node.last_peers:
416 allocated = p.allocated
417 self.failUnlessEqual(len(allocated), 2)
418 self.failUnlessEqual(p.queries, 2)
419 d.addCallback(_check)
422 def test_one_each_plus_one_extra(self):
423 # if we have 51 shares, and there are 50 peers, then one peer gets
424 # two shares and the rest get just one
427 data = self.get_data(SIZE_LARGE)
428 self.set_encoding_parameters(24, 41, 51)
429 d = upload_data(self.u, data)
430 d.addCallback(extract_uri)
431 d.addCallback(self._check_large, SIZE_LARGE)
435 for p in self.node.last_peers:
436 allocated = p.allocated
437 self.failUnless(len(allocated) in (1,2), len(allocated))
438 if len(allocated) == 1:
439 self.failUnlessEqual(p.queries, 1)
442 self.failUnlessEqual(p.queries, 2)
444 self.failUnlessEqual(len(got_one), 49)
445 self.failUnlessEqual(len(got_two), 1)
446 d.addCallback(_check)
449 def test_four_each(self):
450 # if we have 200 shares, and there are 50 peers, then each peer gets
451 # 4 shares. The design goal is to accomplish this with only two
455 data = self.get_data(SIZE_LARGE)
456 self.set_encoding_parameters(100, 150, 200)
457 d = upload_data(self.u, data)
458 d.addCallback(extract_uri)
459 d.addCallback(self._check_large, SIZE_LARGE)
461 for p in self.node.last_peers:
462 allocated = p.allocated
463 self.failUnlessEqual(len(allocated), 4)
464 self.failUnlessEqual(p.queries, 2)
465 d.addCallback(_check)
468 def test_three_of_ten(self):
469 # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
473 data = self.get_data(SIZE_LARGE)
474 self.set_encoding_parameters(3, 5, 10)
475 d = upload_data(self.u, data)
476 d.addCallback(extract_uri)
477 d.addCallback(self._check_large, SIZE_LARGE)
480 for p in self.node.last_peers:
481 allocated = p.allocated
482 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
483 histogram = [counts.get(i, 0) for i in range(5)]
484 self.failUnlessEqual(histogram, [0,0,0,2,1])
485 d.addCallback(_check)
488 class StorageIndex(unittest.TestCase):
489 def test_params_must_matter(self):
490 DATA = "I am some data"
491 u = upload.Data(DATA, convergence="")
492 eu = upload.EncryptAnUploadable(u)
493 d1 = eu.get_storage_index()
495 # CHK means the same data should encrypt the same way
496 u = upload.Data(DATA, convergence="")
497 eu = upload.EncryptAnUploadable(u)
498 d1a = eu.get_storage_index()
500 # but if we use a different convergence string it should be different
501 u = upload.Data(DATA, convergence="wheee!")
502 eu = upload.EncryptAnUploadable(u)
503 d1salt1 = eu.get_storage_index()
505 # and if we add yet a different convergence it should be different again
506 u = upload.Data(DATA, convergence="NOT wheee!")
507 eu = upload.EncryptAnUploadable(u)
508 d1salt2 = eu.get_storage_index()
510 # and if we use the first string again it should be the same as last time
511 u = upload.Data(DATA, convergence="wheee!")
512 eu = upload.EncryptAnUploadable(u)
513 d1salt1a = eu.get_storage_index()
515 # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
516 u = upload.Data(DATA, convergence="")
517 u.encoding_param_k = u.default_encoding_param_k + 1
518 eu = upload.EncryptAnUploadable(u)
519 d2 = eu.get_storage_index()
521 # and if we use a random key, it should be different than the CHK
522 u = upload.Data(DATA, convergence=None)
523 eu = upload.EncryptAnUploadable(u)
524 d3 = eu.get_storage_index()
525 # and different from another instance
526 u = upload.Data(DATA, convergence=None)
527 eu = upload.EncryptAnUploadable(u)
528 d4 = eu.get_storage_index()
530 d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
532 si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
533 self.failUnlessEqual(si1, si1a)
534 self.failIfEqual(si1, si2)
535 self.failIfEqual(si1, si3)
536 self.failIfEqual(si1, si4)
537 self.failIfEqual(si3, si4)
538 self.failIfEqual(si1salt1, si1)
539 self.failIfEqual(si1salt1, si1salt2)
540 self.failIfEqual(si1salt2, si1)
541 self.failUnlessEqual(si1salt1, si1salt1a)
547 # upload with exactly 75 peers (shares_of_happiness)
548 # have a download fail
549 # cancel a download (need to implement more cancel stuff)