]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
35c05461d41e06b5811752dd8309f6573002148f
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1
2 import os, shutil
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.python.failure import Failure
6 from twisted.python import log
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
9
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, NoSharesError, \
14      NotEnoughSharesError
15 from allmydata.util.assertutil import precondition
16 from allmydata.util.deferredutil import DeferredListShouldSucceed
17 from no_network import GridTestMixin
18 from common_util import ShouldFailMixin
19 from allmydata.storage_client import StorageFarmBroker
20 from allmydata.storage.server import storage_index_to_dir
21
22 MiB = 1024*1024
23
24 def extract_uri(results):
25     return results.uri
26
27 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
28 # have been due to an earlier test ERROR'ing out due to timeout, which seems
29 # to screw up subsequent tests.
30 timeout = 960
31
32 class Uploadable(unittest.TestCase):
33     def shouldEqual(self, data, expected):
34         self.failUnless(isinstance(data, list))
35         for e in data:
36             self.failUnless(isinstance(e, str))
37         s = "".join(data)
38         self.failUnlessEqual(s, expected)
39
40     def test_filehandle_random_key(self):
41         return self._test_filehandle(convergence=None)
42
43     def test_filehandle_convergent_encryption(self):
44         return self._test_filehandle(convergence="some convergence string")
45
46     def _test_filehandle(self, convergence):
47         s = StringIO("a"*41)
48         u = upload.FileHandle(s, convergence=convergence)
49         d = u.get_size()
50         d.addCallback(self.failUnlessEqual, 41)
51         d.addCallback(lambda res: u.read(1))
52         d.addCallback(self.shouldEqual, "a")
53         d.addCallback(lambda res: u.read(80))
54         d.addCallback(self.shouldEqual, "a"*40)
55         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
56         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
57         return d
58
59     def test_filename(self):
60         basedir = "upload/Uploadable/test_filename"
61         os.makedirs(basedir)
62         fn = os.path.join(basedir, "file")
63         f = open(fn, "w")
64         f.write("a"*41)
65         f.close()
66         u = upload.FileName(fn, convergence=None)
67         d = u.get_size()
68         d.addCallback(self.failUnlessEqual, 41)
69         d.addCallback(lambda res: u.read(1))
70         d.addCallback(self.shouldEqual, "a")
71         d.addCallback(lambda res: u.read(80))
72         d.addCallback(self.shouldEqual, "a"*40)
73         d.addCallback(lambda res: u.close())
74         return d
75
76     def test_data(self):
77         s = "a"*41
78         u = upload.Data(s, convergence=None)
79         d = u.get_size()
80         d.addCallback(self.failUnlessEqual, 41)
81         d.addCallback(lambda res: u.read(1))
82         d.addCallback(self.shouldEqual, "a")
83         d.addCallback(lambda res: u.read(80))
84         d.addCallback(self.shouldEqual, "a"*40)
85         d.addCallback(lambda res: u.close())
86         return d
87
88 class ServerError(Exception):
89     pass
90
91 class SetDEPMixin:
92     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
93         p = {"k": k,
94              "happy": happy,
95              "n": n,
96              "max_segment_size": max_segsize,
97              }
98         self.node.DEFAULT_ENCODING_PARAMETERS = p
99
100 class FakeStorageServer:
101     def __init__(self, mode):
102         self.mode = mode
103         self.allocated = []
104         self.queries = 0
105         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
106                          { "maximum-immutable-share-size": 2**32 },
107                          "application-version": str(allmydata.__full_version__),
108                          }
109         if mode == "small":
110             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
111                              { "maximum-immutable-share-size": 10 },
112                              "application-version": str(allmydata.__full_version__),
113                              }
114
115
116     def callRemote(self, methname, *args, **kwargs):
117         def _call():
118             meth = getattr(self, methname)
119             return meth(*args, **kwargs)
120         d = fireEventually()
121         d.addCallback(lambda res: _call())
122         return d
123
124     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
125                          sharenums, share_size, canary):
126         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
127         if self.mode == "first-fail":
128             if self.queries == 0:
129                 raise ServerError
130         if self.mode == "second-fail":
131             if self.queries == 1:
132                 raise ServerError
133         self.queries += 1
134         if self.mode == "full":
135             return (set(), {},)
136         elif self.mode == "already got them":
137             return (set(sharenums), {},)
138         else:
139             for shnum in sharenums:
140                 self.allocated.append( (storage_index, shnum) )
141             return (set(),
142                     dict([( shnum, FakeBucketWriter(share_size) )
143                           for shnum in sharenums]),
144                     )
145
146 class FakeBucketWriter:
147     # a diagnostic version of storageserver.BucketWriter
148     def __init__(self, size):
149         self.data = StringIO()
150         self.closed = False
151         self._size = size
152
153     def callRemote(self, methname, *args, **kwargs):
154         def _call():
155             meth = getattr(self, "remote_" + methname)
156             return meth(*args, **kwargs)
157         d = fireEventually()
158         d.addCallback(lambda res: _call())
159         return d
160
161     def remote_write(self, offset, data):
162         precondition(not self.closed)
163         precondition(offset >= 0)
164         precondition(offset+len(data) <= self._size,
165                      "offset=%d + data=%d > size=%d" %
166                      (offset, len(data), self._size))
167         self.data.seek(offset)
168         self.data.write(data)
169
170     def remote_close(self):
171         precondition(not self.closed)
172         self.closed = True
173
174     def remote_abort(self):
175         log.err(RuntimeError("uh oh, I was asked to abort"))
176
177 class FakeClient:
178     DEFAULT_ENCODING_PARAMETERS = {"k":25,
179                                    "happy": 75,
180                                    "n": 100,
181                                    "max_segment_size": 1*MiB,
182                                    }
183     def __init__(self, mode="good", num_servers=50):
184         self.num_servers = num_servers
185         if type(mode) is str:
186             mode = dict([i,mode] for i in range(num_servers))
187         peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
188                   for fakeid in range(self.num_servers) ]
189         self.storage_broker = StorageFarmBroker(None, permute_peers=True)
190         for (serverid, server) in peers:
191             self.storage_broker.test_add_server(serverid, server)
192         self.last_peers = [p[1] for p in peers]
193
194     def log(self, *args, **kwargs):
195         pass
196     def get_encoding_parameters(self):
197         return self.DEFAULT_ENCODING_PARAMETERS
198     def get_storage_broker(self):
199         return self.storage_broker
200     _secret_holder = client.SecretHolder("lease secret", "convergence secret")
201
202 class GotTooFarError(Exception):
203     pass
204
205 class GiganticUploadable(upload.FileHandle):
206     def __init__(self, size):
207         self._size = size
208         self._fp = 0
209
210     def get_encryption_key(self):
211         return defer.succeed("\x00" * 16)
212     def get_size(self):
213         return defer.succeed(self._size)
214     def read(self, length):
215         left = self._size - self._fp
216         length = min(left, length)
217         self._fp += length
218         if self._fp > 1000000:
219             # terminate the test early.
220             raise GotTooFarError("we shouldn't be allowed to get this far")
221         return defer.succeed(["\x00" * length])
222     def close(self):
223         pass
224
225 DATA = """
226 Once upon a time, there was a beautiful princess named Buttercup. She lived
227 in a magical land where every file was stored securely among millions of
228 machines, and nobody ever worried about their data being lost ever again.
229 The End.
230 """
231 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
232
233 SIZE_ZERO = 0
234 SIZE_SMALL = 16
235 SIZE_LARGE = len(DATA)
236
237 def upload_data(uploader, data):
238     u = upload.Data(data, convergence=None)
239     return uploader.upload(u)
240 def upload_filename(uploader, filename):
241     u = upload.FileName(filename, convergence=None)
242     return uploader.upload(u)
243 def upload_filehandle(uploader, fh):
244     u = upload.FileHandle(fh, convergence=None)
245     return uploader.upload(u)
246
247 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
248     def setUp(self):
249         self.node = FakeClient(mode="good")
250         self.u = upload.Uploader()
251         self.u.running = True
252         self.u.parent = self.node
253
254     def _check_small(self, newuri, size):
255         u = uri.from_string(newuri)
256         self.failUnless(isinstance(u, uri.LiteralFileURI))
257         self.failUnlessEqual(len(u.data), size)
258
259     def _check_large(self, newuri, size):
260         u = uri.from_string(newuri)
261         self.failUnless(isinstance(u, uri.CHKFileURI))
262         self.failUnless(isinstance(u.get_storage_index(), str))
263         self.failUnlessEqual(len(u.get_storage_index()), 16)
264         self.failUnless(isinstance(u.key, str))
265         self.failUnlessEqual(len(u.key), 16)
266         self.failUnlessEqual(u.size, size)
267
268     def get_data(self, size):
269         return DATA[:size]
270
271     def test_too_large(self):
272         # we've removed the 4GiB share size limit (see ticket #346 for
273         # details), but still have an 8-byte field, so the limit is now
274         # 2**64, so make sure we reject files larger than that.
275         k = 3; happy = 7; n = 10
276         self.set_encoding_parameters(k, happy, n)
277         big = k*(2**64)
278         data1 = GiganticUploadable(big)
279         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
280                             "This file is too large to be uploaded (data_size)",
281                             self.u.upload, data1)
282         data2 = GiganticUploadable(big-3)
283         d.addCallback(lambda res:
284                       self.shouldFail(FileTooLargeError,
285                                       "test_too_large-data2",
286                                       "This file is too large to be uploaded (offsets)",
287                                       self.u.upload, data2))
288         # I don't know where the actual limit is.. it depends upon how large
289         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
290         return d
291
292     def test_data_zero(self):
293         data = self.get_data(SIZE_ZERO)
294         d = upload_data(self.u, data)
295         d.addCallback(extract_uri)
296         d.addCallback(self._check_small, SIZE_ZERO)
297         return d
298
299     def test_data_small(self):
300         data = self.get_data(SIZE_SMALL)
301         d = upload_data(self.u, data)
302         d.addCallback(extract_uri)
303         d.addCallback(self._check_small, SIZE_SMALL)
304         return d
305
306     def test_data_large(self):
307         data = self.get_data(SIZE_LARGE)
308         d = upload_data(self.u, data)
309         d.addCallback(extract_uri)
310         d.addCallback(self._check_large, SIZE_LARGE)
311         return d
312
313     def test_data_large_odd_segments(self):
314         data = self.get_data(SIZE_LARGE)
315         segsize = int(SIZE_LARGE / 2.5)
316         # we want 3 segments, since that's not a power of two
317         self.set_encoding_parameters(25, 75, 100, segsize)
318         d = upload_data(self.u, data)
319         d.addCallback(extract_uri)
320         d.addCallback(self._check_large, SIZE_LARGE)
321         return d
322
323     def test_filehandle_zero(self):
324         data = self.get_data(SIZE_ZERO)
325         d = upload_filehandle(self.u, StringIO(data))
326         d.addCallback(extract_uri)
327         d.addCallback(self._check_small, SIZE_ZERO)
328         return d
329
330     def test_filehandle_small(self):
331         data = self.get_data(SIZE_SMALL)
332         d = upload_filehandle(self.u, StringIO(data))
333         d.addCallback(extract_uri)
334         d.addCallback(self._check_small, SIZE_SMALL)
335         return d
336
337     def test_filehandle_large(self):
338         data = self.get_data(SIZE_LARGE)
339         d = upload_filehandle(self.u, StringIO(data))
340         d.addCallback(extract_uri)
341         d.addCallback(self._check_large, SIZE_LARGE)
342         return d
343
344     def test_filename_zero(self):
345         fn = "Uploader-test_filename_zero.data"
346         f = open(fn, "wb")
347         data = self.get_data(SIZE_ZERO)
348         f.write(data)
349         f.close()
350         d = upload_filename(self.u, fn)
351         d.addCallback(extract_uri)
352         d.addCallback(self._check_small, SIZE_ZERO)
353         return d
354
355     def test_filename_small(self):
356         fn = "Uploader-test_filename_small.data"
357         f = open(fn, "wb")
358         data = self.get_data(SIZE_SMALL)
359         f.write(data)
360         f.close()
361         d = upload_filename(self.u, fn)
362         d.addCallback(extract_uri)
363         d.addCallback(self._check_small, SIZE_SMALL)
364         return d
365
366     def test_filename_large(self):
367         fn = "Uploader-test_filename_large.data"
368         f = open(fn, "wb")
369         data = self.get_data(SIZE_LARGE)
370         f.write(data)
371         f.close()
372         d = upload_filename(self.u, fn)
373         d.addCallback(extract_uri)
374         d.addCallback(self._check_large, SIZE_LARGE)
375         return d
376
377 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
378     def make_node(self, mode, num_servers=10):
379         self.node = FakeClient(mode, num_servers)
380         self.u = upload.Uploader()
381         self.u.running = True
382         self.u.parent = self.node
383
384     def _check_large(self, newuri, size):
385         u = uri.from_string(newuri)
386         self.failUnless(isinstance(u, uri.CHKFileURI))
387         self.failUnless(isinstance(u.get_storage_index(), str))
388         self.failUnlessEqual(len(u.get_storage_index()), 16)
389         self.failUnless(isinstance(u.key, str))
390         self.failUnlessEqual(len(u.key), 16)
391         self.failUnlessEqual(u.size, size)
392
393     def test_first_error(self):
394         mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
395         self.make_node(mode)
396         d = upload_data(self.u, DATA)
397         d.addCallback(extract_uri)
398         d.addCallback(self._check_large, SIZE_LARGE)
399         return d
400
401     def test_first_error_all(self):
402         self.make_node("first-fail")
403         d = self.shouldFail(NoSharesError, "first_error_all",
404                             "peer selection failed",
405                             upload_data, self.u, DATA)
406         def _check((f,)):
407             self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
408             # there should also be a 'last failure was' message
409             self.failUnlessIn("ServerError", str(f.value))
410         d.addCallback(_check)
411         return d
412
413     def test_second_error(self):
414         # we want to make sure we make it to a third pass. This means that
415         # the first pass was insufficient to place all shares, and at least
416         # one of second pass servers (other than the last one) accepted a
417         # share (so we'll believe that a third pass will be useful). (if
418         # everyone but the last server throws an error, then we'll send all
419         # the remaining shares to the last server at the end of the second
420         # pass, and if that succeeds, we won't make it to a third pass).
421         #
422         # we can achieve this 97.5% of the time by using 40 servers, having
423         # 39 of them fail on the second request, leaving only one to succeed
424         # on the second request. (we need to keep the number of servers low
425         # enough to ensure a second pass with 100 shares).
426         mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
427         self.make_node(mode, 40)
428         d = upload_data(self.u, DATA)
429         d.addCallback(extract_uri)
430         d.addCallback(self._check_large, SIZE_LARGE)
431         return d
432
433     def test_second_error_all(self):
434         self.make_node("second-fail")
435         d = self.shouldFail(NotEnoughSharesError, "second_error_all",
436                             "peer selection failed",
437                             upload_data, self.u, DATA)
438         def _check((f,)):
439             self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
440             # there should also be a 'last failure was' message
441             self.failUnlessIn("ServerError", str(f.value))
442         d.addCallback(_check)
443         return d
444
445 class FullServer(unittest.TestCase):
446     def setUp(self):
447         self.node = FakeClient(mode="full")
448         self.u = upload.Uploader()
449         self.u.running = True
450         self.u.parent = self.node
451
452     def _should_fail(self, f):
453         self.failUnless(isinstance(f, Failure) and f.check(NoSharesError), f)
454
455     def test_data_large(self):
456         data = DATA
457         d = upload_data(self.u, data)
458         d.addBoth(self._should_fail)
459         return d
460
461 class PeerSelection(unittest.TestCase):
462
463     def make_client(self, num_servers=50):
464         self.node = FakeClient(mode="good", num_servers=num_servers)
465         self.u = upload.Uploader()
466         self.u.running = True
467         self.u.parent = self.node
468
469     def get_data(self, size):
470         return DATA[:size]
471
472     def _check_large(self, newuri, size):
473         u = uri.from_string(newuri)
474         self.failUnless(isinstance(u, uri.CHKFileURI))
475         self.failUnless(isinstance(u.get_storage_index(), str))
476         self.failUnlessEqual(len(u.get_storage_index()), 16)
477         self.failUnless(isinstance(u.key, str))
478         self.failUnlessEqual(len(u.key), 16)
479         self.failUnlessEqual(u.size, size)
480
481     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
482         p = {"k": k,
483              "happy": happy,
484              "n": n,
485              "max_segment_size": max_segsize,
486              }
487         self.node.DEFAULT_ENCODING_PARAMETERS = p
488
489     def test_one_each(self):
490         # if we have 50 shares, and there are 50 peers, and they all accept a
491         # share, we should get exactly one share per peer
492
493         self.make_client()
494         data = self.get_data(SIZE_LARGE)
495         self.set_encoding_parameters(25, 30, 50)
496         d = upload_data(self.u, data)
497         d.addCallback(extract_uri)
498         d.addCallback(self._check_large, SIZE_LARGE)
499         def _check(res):
500             for p in self.node.last_peers:
501                 allocated = p.allocated
502                 self.failUnlessEqual(len(allocated), 1)
503                 self.failUnlessEqual(p.queries, 1)
504         d.addCallback(_check)
505         return d
506
507     def test_two_each(self):
508         # if we have 100 shares, and there are 50 peers, and they all accept
509         # all shares, we should get exactly two shares per peer
510
511         self.make_client()
512         data = self.get_data(SIZE_LARGE)
513         self.set_encoding_parameters(50, 75, 100)
514         d = upload_data(self.u, data)
515         d.addCallback(extract_uri)
516         d.addCallback(self._check_large, SIZE_LARGE)
517         def _check(res):
518             for p in self.node.last_peers:
519                 allocated = p.allocated
520                 self.failUnlessEqual(len(allocated), 2)
521                 self.failUnlessEqual(p.queries, 2)
522         d.addCallback(_check)
523         return d
524
525     def test_one_each_plus_one_extra(self):
526         # if we have 51 shares, and there are 50 peers, then one peer gets
527         # two shares and the rest get just one
528
529         self.make_client()
530         data = self.get_data(SIZE_LARGE)
531         self.set_encoding_parameters(24, 41, 51)
532         d = upload_data(self.u, data)
533         d.addCallback(extract_uri)
534         d.addCallback(self._check_large, SIZE_LARGE)
535         def _check(res):
536             got_one = []
537             got_two = []
538             for p in self.node.last_peers:
539                 allocated = p.allocated
540                 self.failUnless(len(allocated) in (1,2), len(allocated))
541                 if len(allocated) == 1:
542                     self.failUnlessEqual(p.queries, 1)
543                     got_one.append(p)
544                 else:
545                     self.failUnlessEqual(p.queries, 2)
546                     got_two.append(p)
547             self.failUnlessEqual(len(got_one), 49)
548             self.failUnlessEqual(len(got_two), 1)
549         d.addCallback(_check)
550         return d
551
552     def test_four_each(self):
553         # if we have 200 shares, and there are 50 peers, then each peer gets
554         # 4 shares. The design goal is to accomplish this with only two
555         # queries per peer.
556
557         self.make_client()
558         data = self.get_data(SIZE_LARGE)
559         self.set_encoding_parameters(100, 150, 200)
560         d = upload_data(self.u, data)
561         d.addCallback(extract_uri)
562         d.addCallback(self._check_large, SIZE_LARGE)
563         def _check(res):
564             for p in self.node.last_peers:
565                 allocated = p.allocated
566                 self.failUnlessEqual(len(allocated), 4)
567                 self.failUnlessEqual(p.queries, 2)
568         d.addCallback(_check)
569         return d
570
571     def test_three_of_ten(self):
572         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
573         # 4+4+2
574
575         self.make_client(3)
576         data = self.get_data(SIZE_LARGE)
577         self.set_encoding_parameters(3, 5, 10)
578         d = upload_data(self.u, data)
579         d.addCallback(extract_uri)
580         d.addCallback(self._check_large, SIZE_LARGE)
581         def _check(res):
582             counts = {}
583             for p in self.node.last_peers:
584                 allocated = p.allocated
585                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
586             histogram = [counts.get(i, 0) for i in range(5)]
587             self.failUnlessEqual(histogram, [0,0,0,2,1])
588         d.addCallback(_check)
589         return d
590
591     def test_some_big_some_small(self):
592         # 10 shares, 20 servers, but half the servers don't support a
593         # share-size large enough for our file
594         mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
595         self.node = FakeClient(mode, num_servers=20)
596         self.u = upload.Uploader()
597         self.u.running = True
598         self.u.parent = self.node
599
600         data = self.get_data(SIZE_LARGE)
601         self.set_encoding_parameters(3, 5, 10)
602         d = upload_data(self.u, data)
603         d.addCallback(extract_uri)
604         d.addCallback(self._check_large, SIZE_LARGE)
605         def _check(res):
606             # we should have put one share each on the big peers, and zero
607             # shares on the small peers
608             total_allocated = 0
609             for p in self.node.last_peers:
610                 if p.mode == "good":
611                     self.failUnlessEqual(len(p.allocated), 1)
612                 elif p.mode == "small":
613                     self.failUnlessEqual(len(p.allocated), 0)
614                 total_allocated += len(p.allocated)
615             self.failUnlessEqual(total_allocated, 10)
616         d.addCallback(_check)
617         return d
618
619
620 class StorageIndex(unittest.TestCase):
621     def test_params_must_matter(self):
622         DATA = "I am some data"
623         u = upload.Data(DATA, convergence="")
624         eu = upload.EncryptAnUploadable(u)
625         d1 = eu.get_storage_index()
626
627         # CHK means the same data should encrypt the same way
628         u = upload.Data(DATA, convergence="")
629         eu = upload.EncryptAnUploadable(u)
630         d1a = eu.get_storage_index()
631
632         # but if we use a different convergence string it should be different
633         u = upload.Data(DATA, convergence="wheee!")
634         eu = upload.EncryptAnUploadable(u)
635         d1salt1 = eu.get_storage_index()
636
637         # and if we add yet a different convergence it should be different again
638         u = upload.Data(DATA, convergence="NOT wheee!")
639         eu = upload.EncryptAnUploadable(u)
640         d1salt2 = eu.get_storage_index()
641
642         # and if we use the first string again it should be the same as last time
643         u = upload.Data(DATA, convergence="wheee!")
644         eu = upload.EncryptAnUploadable(u)
645         d1salt1a = eu.get_storage_index()
646
647         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
648         u = upload.Data(DATA, convergence="")
649         u.encoding_param_k = u.default_encoding_param_k + 1
650         eu = upload.EncryptAnUploadable(u)
651         d2 = eu.get_storage_index()
652
653         # and if we use a random key, it should be different than the CHK
654         u = upload.Data(DATA, convergence=None)
655         eu = upload.EncryptAnUploadable(u)
656         d3 = eu.get_storage_index()
657         # and different from another instance
658         u = upload.Data(DATA, convergence=None)
659         eu = upload.EncryptAnUploadable(u)
660         d4 = eu.get_storage_index()
661
662         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
663         def _done(res):
664             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
665             self.failUnlessEqual(si1, si1a)
666             self.failIfEqual(si1, si2)
667             self.failIfEqual(si1, si3)
668             self.failIfEqual(si1, si4)
669             self.failIfEqual(si3, si4)
670             self.failIfEqual(si1salt1, si1)
671             self.failIfEqual(si1salt1, si1salt2)
672             self.failIfEqual(si1salt2, si1)
673             self.failUnlessEqual(si1salt1, si1salt1a)
674         d.addCallback(_done)
675         return d
676
677 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
678     ShouldFailMixin):
679     def _do_upload_with_broken_servers(self, servers_to_break):
680         """
681         I act like a normal upload, but before I send the results of
682         Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
683         PeerTrackers in the used_peers part of the return result.
684         """
685         assert self.g, "I tried to find a grid at self.g, but failed"
686         broker = self.g.clients[0].storage_broker
687         sh     = self.g.clients[0]._secret_holder
688         data = upload.Data("data" * 10000, convergence="")
689         data.encoding_param_k = 3
690         data.encoding_param_happy = 4
691         data.encoding_param_n = 10
692         uploadable = upload.EncryptAnUploadable(data)
693         encoder = encode.Encoder()
694         encoder.set_encrypted_uploadable(uploadable)
695         status = upload.UploadStatus()
696         selector = upload.Tahoe2PeerSelector("dglev", "test", status)
697         storage_index = encoder.get_param("storage_index")
698         share_size = encoder.get_param("share_size")
699         block_size = encoder.get_param("block_size")
700         num_segments = encoder.get_param("num_segments")
701         d = selector.get_shareholders(broker, sh, storage_index,
702                                       share_size, block_size, num_segments,
703                                       10, 4)
704         def _have_shareholders((used_peers, already_peers)):
705             assert servers_to_break <= len(used_peers)
706             for index in xrange(servers_to_break):
707                 server = list(used_peers)[index]
708                 for share in server.buckets.keys():
709                     server.buckets[share].abort()
710             buckets = {}
711             servermap = already_peers.copy()
712             for peer in used_peers:
713                 buckets.update(peer.buckets)
714                 for bucket in peer.buckets:
715                     servermap[bucket] = peer.peerid
716             encoder.set_shareholders(buckets, servermap)
717             d = encoder.start()
718             return d
719         d.addCallback(_have_shareholders)
720         return d
721
722     def _add_server(self, server_number, readonly=False):
723         assert self.g, "I tried to find a grid at self.g, but failed"
724         assert self.shares, "I tried to find shares at self.shares, but failed"
725         ss = self.g.make_server(server_number, readonly)
726         self.g.add_server(server_number, ss)
727
728     def _add_server_with_share(self, server_number, share_number=None,
729                                readonly=False):
730         self._add_server(server_number, readonly)
731         if share_number:
732             self._copy_share_to_server(share_number, server_number)
733
734     def _copy_share_to_server(self, share_number, server_number):
735         ss = self.g.servers_by_number[server_number]
736         # Copy share i from the directory associated with the first 
737         # storage server to the directory associated with this one.
738         assert self.g, "I tried to find a grid at self.g, but failed"
739         assert self.shares, "I tried to find shares at self.shares, but failed"
740         old_share_location = self.shares[share_number][2]
741         new_share_location = os.path.join(ss.storedir, "shares")
742         si = uri.from_string(self.uri).get_storage_index()
743         new_share_location = os.path.join(new_share_location,
744                                           storage_index_to_dir(si))
745         if not os.path.exists(new_share_location):
746             os.makedirs(new_share_location)
747         new_share_location = os.path.join(new_share_location,
748                                           str(share_number))
749         shutil.copy(old_share_location, new_share_location)
750         shares = self.find_shares(self.uri)
751         # Make sure that the storage server has the share.
752         self.failUnless((share_number, ss.my_nodeid, new_share_location)
753                         in shares)
754
755
756     def _setup_and_upload(self):
757         """
758         I set up a NoNetworkGrid with a single server and client,
759         upload a file to it, store its uri in self.uri, and store its
760         sharedata in self.shares.
761         """
762         self.set_up_grid(num_clients=1, num_servers=1)
763         client = self.g.clients[0]
764         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
765         data = upload.Data("data" * 10000, convergence="")
766         self.data = data
767         d = client.upload(data)
768         def _store_uri(ur):
769             self.uri = ur.uri
770         d.addCallback(_store_uri)
771         d.addCallback(lambda ign:
772             self.find_shares(self.uri))
773         def _store_shares(shares):
774             self.shares = shares
775         d.addCallback(_store_shares)
776         return d
777
778     def test_configure_parameters(self):
779         self.basedir = self.mktemp()
780         hooks = {0: self._set_up_nodes_extra_config}
781         self.set_up_grid(client_config_hooks=hooks)
782         c0 = self.g.clients[0]
783
784         DATA = "data" * 100
785         u = upload.Data(DATA, convergence="")
786         d = c0.upload(u)
787         d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
788         m = monitor.Monitor()
789         d.addCallback(lambda fn: fn.check(m))
790         def _check(cr):
791             data = cr.get_data()
792             self.failUnlessEqual(data["count-shares-needed"], 7)
793             self.failUnlessEqual(data["count-shares-expected"], 12)
794         d.addCallback(_check)
795         return d
796
797     def _setUp(self, ns):
798         # Used by test_happy_semantics and test_prexisting_share_behavior
799         # to set up the grid.
800         self.node = FakeClient(mode="good", num_servers=ns)
801         self.u = upload.Uploader()
802         self.u.running = True
803         self.u.parent = self.node
804
805     def test_happy_semantics(self):
806         self._setUp(2)
807         DATA = upload.Data("kittens" * 10000, convergence="")
808         # These parameters are unsatisfiable with the client that we've made
809         # -- we'll use them to test that the semnatics work correctly.
810         self.set_encoding_parameters(k=3, happy=5, n=10)
811         d = self.shouldFail(NotEnoughSharesError, "test_happy_semantics",
812                             "shares could only be placed on 2 servers "
813                             "(5 were requested)",
814                             self.u.upload, DATA)
815         # Let's reset the client to have 10 servers
816         d.addCallback(lambda ign:
817             self._setUp(10))
818         # These parameters are satisfiable with the client we've made.
819         d.addCallback(lambda ign:
820             self.set_encoding_parameters(k=3, happy=5, n=10))
821         # this should work
822         d.addCallback(lambda ign:
823             self.u.upload(DATA))
824         # Let's reset the client to have 7 servers
825         # (this is less than n, but more than h)
826         d.addCallback(lambda ign:
827             self._setUp(7))
828         # These encoding parameters should still be satisfiable with our 
829         # client setup
830         d.addCallback(lambda ign:
831             self.set_encoding_parameters(k=3, happy=5, n=10))
832         # This, then, should work.
833         d.addCallback(lambda ign:
834             self.u.upload(DATA))
835         return d
836
837     def test_problem_layouts(self):
838         self.basedir = self.mktemp()
839         # This scenario is at 
840         # http://allmydata.org/trac/tahoe/ticket/778#comment:52
841         #
842         # The scenario in comment:52 proposes that we have a layout
843         # like:
844         # server 1: share 1
845         # server 2: share 1
846         # server 3: share 1
847         # server 4: shares 2 - 10
848         # To get access to the shares, we will first upload to one 
849         # server, which will then have shares 1 - 10. We'll then 
850         # add three new servers, configure them to not accept any new
851         # shares, then write share 1 directly into the serverdir of each.
852         # Then each of servers 1 - 3 will report that they have share 1, 
853         # and will not accept any new share, while server 4 will report that
854         # it has shares 2 - 10 and will accept new shares.
855         # We'll then set 'happy' = 4, and see that an upload fails
856         # (as it should)
857         d = self._setup_and_upload()
858         d.addCallback(lambda ign:
859             self._add_server_with_share(1, 0, True))
860         d.addCallback(lambda ign:
861             self._add_server_with_share(2, 0, True))
862         d.addCallback(lambda ign:
863             self._add_server_with_share(3, 0, True))
864         # Remove the first share from server 0.
865         def _remove_share_0():
866             share_location = self.shares[0][2]
867             os.remove(share_location)
868         d.addCallback(lambda ign:
869             _remove_share_0())
870         # Set happy = 4 in the client.
871         def _prepare():
872             client = self.g.clients[0]
873             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
874             return client
875         d.addCallback(lambda ign:
876             _prepare())
877         # Uploading data should fail
878         d.addCallback(lambda client:
879             self.shouldFail(NotEnoughSharesError, "test_happy_semantics",
880                             "shares could only be placed on 1 servers "
881                             "(4 were requested)",
882                             client.upload, upload.Data("data" * 10000,
883                                                        convergence="")))
884
885
886         # This scenario is at
887         # http://allmydata.org/trac/tahoe/ticket/778#comment:53
888         #
889         # Set up the grid to have one server
890         def _change_basedir(ign):
891             self.basedir = self.mktemp()
892         d.addCallback(_change_basedir)
893         d.addCallback(lambda ign:
894             self._setup_and_upload())
895         # We want to have a layout like this:
896         # server 1: share 1
897         # server 2: share 2
898         # server 3: share 3
899         # server 4: shares 1 - 10
900         # (this is an expansion of Zooko's example because it is easier
901         #  to code, but it will fail in the same way)
902         # To start, we'll create a server with shares 1-10 of the data 
903         # we're about to upload.
904         # Next, we'll add three new servers to our NoNetworkGrid. We'll add
905         # one share from our initial upload to each of these.
906         # The counterintuitive ordering of the share numbers is to deal with 
907         # the permuting of these servers -- distributing the shares this 
908         # way ensures that the Tahoe2PeerSelector sees them in the order 
909         # described above.
910         d.addCallback(lambda ign:
911             self._add_server_with_share(server_number=1, share_number=2))
912         d.addCallback(lambda ign:
913             self._add_server_with_share(server_number=2, share_number=0))
914         d.addCallback(lambda ign:
915             self._add_server_with_share(server_number=3, share_number=1))
916         # So, we now have the following layout:
917         # server 0: shares 0 - 9
918         # server 1: share 0
919         # server 2: share 1
920         # server 3: share 2
921         # We want to change the 'happy' parameter in the client to 4. 
922         # We then want to feed the upload process a list of peers that
923         # server 0 is at the front of, so we trigger Zooko's scenario.
924         # Ideally, a reupload of our original data should work.
925         def _reset_encoding_parameters(ign):
926             client = self.g.clients[0]
927             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
928             return client
929         d.addCallback(_reset_encoding_parameters)
930         # We need this to get around the fact that the old Data 
931         # instance already has a happy parameter set.
932         d.addCallback(lambda client:
933             client.upload(upload.Data("data" * 10000, convergence="")))
934
935
936         # This scenario is basically comment:53, but with the order reversed;
937         # this means that the Tahoe2PeerSelector sees
938         # server 0: shares 1-10
939         # server 1: share 1
940         # server 2: share 2
941         # server 3: share 3
942         d.addCallback(_change_basedir)
943         d.addCallback(lambda ign:
944             self._setup_and_upload())
945         d.addCallback(lambda ign:
946             self._add_server_with_share(server_number=2, share_number=0))
947         d.addCallback(lambda ign:
948             self._add_server_with_share(server_number=3, share_number=1))
949         d.addCallback(lambda ign:
950             self._add_server_with_share(server_number=1, share_number=2))
951         # Copy all of the other shares to server number 2
952         def _copy_shares(ign):
953             for i in xrange(1, 10):
954                 self._copy_share_to_server(i, 2)
955         d.addCallback(_copy_shares)
956         # Remove the first server, and add a placeholder with share 0
957         d.addCallback(lambda ign:
958             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
959         d.addCallback(lambda ign:
960             self._add_server_with_share(server_number=0, share_number=0))
961         # Now try uploading. 
962         d.addCallback(_reset_encoding_parameters)
963         d.addCallback(lambda client:
964             client.upload(upload.Data("data" * 10000, convergence="")))
965         # Try the same thing, but with empty servers after the first one
966         # We want to make sure that Tahoe2PeerSelector will redistribute
967         # shares as necessary, not simply discover an existing layout.
968         d.addCallback(_change_basedir)
969         d.addCallback(lambda ign:
970             self._setup_and_upload())
971         d.addCallback(lambda ign:
972             self._add_server(server_number=2))
973         d.addCallback(lambda ign:
974             self._add_server(server_number=3))
975         d.addCallback(lambda ign:
976             self._add_server(server_number=1))
977         d.addCallback(_copy_shares)
978         d.addCallback(lambda ign:
979             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
980         d.addCallback(lambda ign:
981             self._add_server(server_number=0))
982         d.addCallback(_reset_encoding_parameters)
983         d.addCallback(lambda client:
984             client.upload(upload.Data("data" * 10000, convergence="")))
985         # Try the following layout
986         # server 0: shares 1-10
987         # server 1: share 1, read-only
988         # server 2: share 2, read-only
989         # server 3: share 3, read-only
990         d.addCallback(_change_basedir)
991         d.addCallback(lambda ign:
992             self._setup_and_upload())
993         d.addCallback(lambda ign:
994             self._add_server_with_share(server_number=2, share_number=0))
995         d.addCallback(lambda ign:
996             self._add_server_with_share(server_number=3, share_number=1,
997                                         readonly=True))
998         d.addCallback(lambda ign:
999             self._add_server_with_share(server_number=1, share_number=2,
1000                                         readonly=True))
1001         # Copy all of the other shares to server number 2
1002         d.addCallback(_copy_shares)
1003         # Remove server 0, and add another in its place
1004         d.addCallback(lambda ign:
1005             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1006         d.addCallback(lambda ign:
1007             self._add_server_with_share(server_number=0, share_number=0,
1008                                         readonly=True))
1009         d.addCallback(_reset_encoding_parameters)
1010         d.addCallback(lambda client:
1011             client.upload(upload.Data("data" * 10000, convergence="")))
1012         return d
1013
1014
1015     def test_dropped_servers_in_encoder(self):
1016         def _set_basedir(ign=None):
1017             self.basedir = self.mktemp()
1018         _set_basedir()
1019         d = self._setup_and_upload();
1020         # Add 5 servers, with one share each from the original
1021         def _do_server_setup(ign):
1022             self._add_server_with_share(1, 1)
1023             self._add_server_with_share(2)
1024             self._add_server_with_share(3)
1025             self._add_server_with_share(4)
1026             self._add_server_with_share(5)
1027         d.addCallback(_do_server_setup)
1028         # remove the original server
1029         # (necessary to ensure that the Tahoe2PeerSelector will distribute
1030         #  all the shares)
1031         def _remove_server(ign):
1032             server = self.g.servers_by_number[0]
1033             self.g.remove_server(server.my_nodeid)
1034         d.addCallback(_remove_server)
1035         # This should succeed.
1036         d.addCallback(lambda ign:
1037             self._do_upload_with_broken_servers(1))
1038         # Now, do the same thing over again, but drop 2 servers instead
1039         # of 1. This should fail.
1040         d.addCallback(_set_basedir)
1041         d.addCallback(lambda ign:
1042             self._setup_and_upload())
1043         d.addCallback(_do_server_setup)
1044         d.addCallback(_remove_server)
1045         d.addCallback(lambda ign:
1046             self.shouldFail(NotEnoughSharesError,
1047                             "test_dropped_server_in_encoder", "",
1048                             self._do_upload_with_broken_servers, 2))
1049         return d
1050
1051
1052     def test_servers_with_unique_shares(self):
1053         # servers_with_unique_shares expects a dict of 
1054         # shnum => peerid as a preexisting shares argument.
1055         test1 = {
1056                  1 : "server1",
1057                  2 : "server2",
1058                  3 : "server3",
1059                  4 : "server4"
1060                 }
1061         unique_servers = upload.servers_with_unique_shares(test1)
1062         self.failUnlessEqual(4, len(unique_servers))
1063         for server in ["server1", "server2", "server3", "server4"]:
1064             self.failUnlessIn(server, unique_servers)
1065         test1[4] = "server1"
1066         # Now there should only be 3 unique servers.
1067         unique_servers = upload.servers_with_unique_shares(test1)
1068         self.failUnlessEqual(3, len(unique_servers))
1069         for server in ["server1", "server2", "server3"]:
1070             self.failUnlessIn(server, unique_servers)
1071         # servers_with_unique_shares expects a set of PeerTracker
1072         # instances as a used_peers argument, but only uses the peerid
1073         # instance variable to assess uniqueness. So we feed it some fake
1074         # PeerTrackers whose only important characteristic is that they 
1075         # have peerid set to something.
1076         class FakePeerTracker:
1077             pass
1078         trackers = []
1079         for server in ["server5", "server6", "server7", "server8"]:
1080             t = FakePeerTracker()
1081             t.peerid = server
1082             trackers.append(t)
1083         # Recall that there are 3 unique servers in test1. Since none of
1084         # those overlap with the ones in trackers, we should get 7 back
1085         unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
1086         self.failUnlessEqual(7, len(unique_servers))
1087         expected_servers = ["server" + str(i) for i in xrange(1, 9)]
1088         expected_servers.remove("server4")
1089         for server in expected_servers:
1090             self.failUnlessIn(server, unique_servers)
1091         # Now add an overlapping server to trackers.
1092         t = FakePeerTracker()
1093         t.peerid = "server1"
1094         trackers.append(t)
1095         unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
1096         self.failUnlessEqual(7, len(unique_servers))
1097         for server in expected_servers:
1098             self.failUnlessIn(server, unique_servers)
1099
1100
1101     def test_shares_by_server(self):
1102         test = {
1103                     1 : "server1",
1104                     2 : "server2",
1105                     3 : "server3",
1106                     4 : "server4"
1107                }
1108         shares_by_server = upload.shares_by_server(test)
1109         self.failUnlessEqual(set([1]), shares_by_server["server1"])
1110         self.failUnlessEqual(set([2]), shares_by_server["server2"])
1111         self.failUnlessEqual(set([3]), shares_by_server["server3"])
1112         self.failUnlessEqual(set([4]), shares_by_server["server4"])
1113         test1 = {
1114                     1 : "server1",
1115                     2 : "server1",
1116                     3 : "server1",
1117                     4 : "server2",
1118                     5 : "server2"
1119                 }
1120         shares_by_server = upload.shares_by_server(test1)
1121         self.failUnlessEqual(set([1, 2, 3]), shares_by_server["server1"])
1122         self.failUnlessEqual(set([4, 5]), shares_by_server["server2"])
1123
1124
1125     def _set_up_nodes_extra_config(self, clientdir):
1126         cfgfn = os.path.join(clientdir, "tahoe.cfg")
1127         oldcfg = open(cfgfn, "r").read()
1128         f = open(cfgfn, "wt")
1129         f.write(oldcfg)
1130         f.write("\n")
1131         f.write("[client]\n")
1132         f.write("shares.needed = 7\n")
1133         f.write("shares.total = 12\n")
1134         f.write("\n")
1135         f.close()
1136         return None
1137
1138 # TODO:
1139 #  upload with exactly 75 peers (shares_of_happiness)
1140 #  have a download fail
1141 #  cancel a download (need to implement more cancel stuff)