]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
Rewrite immutable downloader (#798). This patch adds and updates unit tests.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1 # -*- coding: utf-8 -*-
2
3 import os, shutil
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.python.failure import Failure
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
9
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
14 from allmydata.util import log
15 from allmydata.util.assertutil import precondition
16 from allmydata.util.deferredutil import DeferredListShouldSucceed
17 from allmydata.test.no_network import GridTestMixin
18 from allmydata.test.common_util import ShouldFailMixin
19 from allmydata.util.happinessutil import servers_of_happiness, \
20                                          shares_by_server, merge_peers
21 from allmydata.storage_client import StorageFarmBroker
22 from allmydata.storage.server import storage_index_to_dir
23
24 MiB = 1024*1024
25
26 def extract_uri(results):
27     return results.uri
28
29 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
30 # have been due to an earlier test ERROR'ing out due to timeout, which seems
31 # to screw up subsequent tests.
32 timeout = 960
33
34 class Uploadable(unittest.TestCase):
35     def shouldEqual(self, data, expected):
36         self.failUnless(isinstance(data, list))
37         for e in data:
38             self.failUnless(isinstance(e, str))
39         s = "".join(data)
40         self.failUnlessEqual(s, expected)
41
42     def test_filehandle_random_key(self):
43         return self._test_filehandle(convergence=None)
44
45     def test_filehandle_convergent_encryption(self):
46         return self._test_filehandle(convergence="some convergence string")
47
48     def _test_filehandle(self, convergence):
49         s = StringIO("a"*41)
50         u = upload.FileHandle(s, convergence=convergence)
51         d = u.get_size()
52         d.addCallback(self.failUnlessEqual, 41)
53         d.addCallback(lambda res: u.read(1))
54         d.addCallback(self.shouldEqual, "a")
55         d.addCallback(lambda res: u.read(80))
56         d.addCallback(self.shouldEqual, "a"*40)
57         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
58         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
59         return d
60
61     def test_filename(self):
62         basedir = "upload/Uploadable/test_filename"
63         os.makedirs(basedir)
64         fn = os.path.join(basedir, "file")
65         f = open(fn, "w")
66         f.write("a"*41)
67         f.close()
68         u = upload.FileName(fn, convergence=None)
69         d = u.get_size()
70         d.addCallback(self.failUnlessEqual, 41)
71         d.addCallback(lambda res: u.read(1))
72         d.addCallback(self.shouldEqual, "a")
73         d.addCallback(lambda res: u.read(80))
74         d.addCallback(self.shouldEqual, "a"*40)
75         d.addCallback(lambda res: u.close())
76         return d
77
78     def test_data(self):
79         s = "a"*41
80         u = upload.Data(s, convergence=None)
81         d = u.get_size()
82         d.addCallback(self.failUnlessEqual, 41)
83         d.addCallback(lambda res: u.read(1))
84         d.addCallback(self.shouldEqual, "a")
85         d.addCallback(lambda res: u.read(80))
86         d.addCallback(self.shouldEqual, "a"*40)
87         d.addCallback(lambda res: u.close())
88         return d
89
90 class ServerError(Exception):
91     pass
92
93 class SetDEPMixin:
94     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
95         p = {"k": k,
96              "happy": happy,
97              "n": n,
98              "max_segment_size": max_segsize,
99              }
100         self.node.DEFAULT_ENCODING_PARAMETERS = p
101
102 class FakeStorageServer:
103     def __init__(self, mode):
104         self.mode = mode
105         self.allocated = []
106         self.queries = 0
107         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
108                          { "maximum-immutable-share-size": 2**32 },
109                          "application-version": str(allmydata.__full_version__),
110                          }
111         if mode == "small":
112             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
113                              { "maximum-immutable-share-size": 10 },
114                              "application-version": str(allmydata.__full_version__),
115                              }
116
117
118     def callRemote(self, methname, *args, **kwargs):
119         def _call():
120             meth = getattr(self, methname)
121             return meth(*args, **kwargs)
122         d = fireEventually()
123         d.addCallback(lambda res: _call())
124         return d
125
126     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
127                          sharenums, share_size, canary):
128         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
129         if self.mode == "first-fail":
130             if self.queries == 0:
131                 raise ServerError
132         if self.mode == "second-fail":
133             if self.queries == 1:
134                 raise ServerError
135         self.queries += 1
136         if self.mode == "full":
137             return (set(), {},)
138         elif self.mode == "already got them":
139             return (set(sharenums), {},)
140         else:
141             for shnum in sharenums:
142                 self.allocated.append( (storage_index, shnum) )
143             return (set(),
144                     dict([( shnum, FakeBucketWriter(share_size) )
145                           for shnum in sharenums]),
146                     )
147
148 class FakeBucketWriter:
149     # a diagnostic version of storageserver.BucketWriter
150     def __init__(self, size):
151         self.data = StringIO()
152         self.closed = False
153         self._size = size
154
155     def callRemote(self, methname, *args, **kwargs):
156         def _call():
157             meth = getattr(self, "remote_" + methname)
158             return meth(*args, **kwargs)
159         d = fireEventually()
160         d.addCallback(lambda res: _call())
161         return d
162
163
164     def callRemoteOnly(self, methname, *args, **kwargs):
165         d = self.callRemote(methname, *args, **kwargs)
166         del d # callRemoteOnly ignores this
167         return None
168
169
170     def remote_write(self, offset, data):
171         precondition(not self.closed)
172         precondition(offset >= 0)
173         precondition(offset+len(data) <= self._size,
174                      "offset=%d + data=%d > size=%d" %
175                      (offset, len(data), self._size))
176         self.data.seek(offset)
177         self.data.write(data)
178
179     def remote_close(self):
180         precondition(not self.closed)
181         self.closed = True
182
183     def remote_abort(self):
184         pass
185
186 class FakeClient:
187     DEFAULT_ENCODING_PARAMETERS = {"k":25,
188                                    "happy": 25,
189                                    "n": 100,
190                                    "max_segment_size": 1*MiB,
191                                    }
192     def __init__(self, mode="good", num_servers=50):
193         self.num_servers = num_servers
194         if type(mode) is str:
195             mode = dict([i,mode] for i in range(num_servers))
196         peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
197                   for fakeid in range(self.num_servers) ]
198         self.storage_broker = StorageFarmBroker(None, permute_peers=True)
199         for (serverid, server) in peers:
200             self.storage_broker.test_add_server(serverid, server)
201         self.last_peers = [p[1] for p in peers]
202
203     def log(self, *args, **kwargs):
204         pass
205     def get_encoding_parameters(self):
206         return self.DEFAULT_ENCODING_PARAMETERS
207     def get_storage_broker(self):
208         return self.storage_broker
209     _secret_holder = client.SecretHolder("lease secret", "convergence secret")
210
211 class GotTooFarError(Exception):
212     pass
213
214 class GiganticUploadable(upload.FileHandle):
215     def __init__(self, size):
216         self._size = size
217         self._fp = 0
218
219     def get_encryption_key(self):
220         return defer.succeed("\x00" * 16)
221     def get_size(self):
222         return defer.succeed(self._size)
223     def read(self, length):
224         left = self._size - self._fp
225         length = min(left, length)
226         self._fp += length
227         if self._fp > 1000000:
228             # terminate the test early.
229             raise GotTooFarError("we shouldn't be allowed to get this far")
230         return defer.succeed(["\x00" * length])
231     def close(self):
232         pass
233
234 DATA = """
235 Once upon a time, there was a beautiful princess named Buttercup. She lived
236 in a magical land where every file was stored securely among millions of
237 machines, and nobody ever worried about their data being lost ever again.
238 The End.
239 """
240 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
241
242 SIZE_ZERO = 0
243 SIZE_SMALL = 16
244 SIZE_LARGE = len(DATA)
245
246 def upload_data(uploader, data):
247     u = upload.Data(data, convergence=None)
248     return uploader.upload(u)
249 def upload_filename(uploader, filename):
250     u = upload.FileName(filename, convergence=None)
251     return uploader.upload(u)
252 def upload_filehandle(uploader, fh):
253     u = upload.FileHandle(fh, convergence=None)
254     return uploader.upload(u)
255
256 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
257     def setUp(self):
258         self.node = FakeClient(mode="good")
259         self.u = upload.Uploader()
260         self.u.running = True
261         self.u.parent = self.node
262
263     def _check_small(self, newuri, size):
264         u = uri.from_string(newuri)
265         self.failUnless(isinstance(u, uri.LiteralFileURI))
266         self.failUnlessEqual(len(u.data), size)
267
268     def _check_large(self, newuri, size):
269         u = uri.from_string(newuri)
270         self.failUnless(isinstance(u, uri.CHKFileURI))
271         self.failUnless(isinstance(u.get_storage_index(), str))
272         self.failUnlessEqual(len(u.get_storage_index()), 16)
273         self.failUnless(isinstance(u.key, str))
274         self.failUnlessEqual(len(u.key), 16)
275         self.failUnlessEqual(u.size, size)
276
277     def get_data(self, size):
278         return DATA[:size]
279
280     def test_too_large(self):
281         # we've removed the 4GiB share size limit (see ticket #346 for
282         # details), but still have an 8-byte field, so the limit is now
283         # 2**64, so make sure we reject files larger than that.
284         k = 3; happy = 7; n = 10
285         self.set_encoding_parameters(k, happy, n)
286         big = k*(2**64)
287         data1 = GiganticUploadable(big)
288         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
289                             "This file is too large to be uploaded (data_size)",
290                             self.u.upload, data1)
291         data2 = GiganticUploadable(big-3)
292         d.addCallback(lambda res:
293                       self.shouldFail(FileTooLargeError,
294                                       "test_too_large-data2",
295                                       "This file is too large to be uploaded (offsets)",
296                                       self.u.upload, data2))
297         # I don't know where the actual limit is.. it depends upon how large
298         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
299         return d
300
301     def test_data_zero(self):
302         data = self.get_data(SIZE_ZERO)
303         d = upload_data(self.u, data)
304         d.addCallback(extract_uri)
305         d.addCallback(self._check_small, SIZE_ZERO)
306         return d
307
308     def test_data_small(self):
309         data = self.get_data(SIZE_SMALL)
310         d = upload_data(self.u, data)
311         d.addCallback(extract_uri)
312         d.addCallback(self._check_small, SIZE_SMALL)
313         return d
314
315     def test_data_large(self):
316         data = self.get_data(SIZE_LARGE)
317         d = upload_data(self.u, data)
318         d.addCallback(extract_uri)
319         d.addCallback(self._check_large, SIZE_LARGE)
320         return d
321
322     def test_data_large_odd_segments(self):
323         data = self.get_data(SIZE_LARGE)
324         segsize = int(SIZE_LARGE / 2.5)
325         # we want 3 segments, since that's not a power of two
326         self.set_encoding_parameters(25, 25, 100, segsize)
327         d = upload_data(self.u, data)
328         d.addCallback(extract_uri)
329         d.addCallback(self._check_large, SIZE_LARGE)
330         return d
331
332     def test_filehandle_zero(self):
333         data = self.get_data(SIZE_ZERO)
334         d = upload_filehandle(self.u, StringIO(data))
335         d.addCallback(extract_uri)
336         d.addCallback(self._check_small, SIZE_ZERO)
337         return d
338
339     def test_filehandle_small(self):
340         data = self.get_data(SIZE_SMALL)
341         d = upload_filehandle(self.u, StringIO(data))
342         d.addCallback(extract_uri)
343         d.addCallback(self._check_small, SIZE_SMALL)
344         return d
345
346     def test_filehandle_large(self):
347         data = self.get_data(SIZE_LARGE)
348         d = upload_filehandle(self.u, StringIO(data))
349         d.addCallback(extract_uri)
350         d.addCallback(self._check_large, SIZE_LARGE)
351         return d
352
353     def test_filename_zero(self):
354         fn = "Uploader-test_filename_zero.data"
355         f = open(fn, "wb")
356         data = self.get_data(SIZE_ZERO)
357         f.write(data)
358         f.close()
359         d = upload_filename(self.u, fn)
360         d.addCallback(extract_uri)
361         d.addCallback(self._check_small, SIZE_ZERO)
362         return d
363
364     def test_filename_small(self):
365         fn = "Uploader-test_filename_small.data"
366         f = open(fn, "wb")
367         data = self.get_data(SIZE_SMALL)
368         f.write(data)
369         f.close()
370         d = upload_filename(self.u, fn)
371         d.addCallback(extract_uri)
372         d.addCallback(self._check_small, SIZE_SMALL)
373         return d
374
375     def test_filename_large(self):
376         fn = "Uploader-test_filename_large.data"
377         f = open(fn, "wb")
378         data = self.get_data(SIZE_LARGE)
379         f.write(data)
380         f.close()
381         d = upload_filename(self.u, fn)
382         d.addCallback(extract_uri)
383         d.addCallback(self._check_large, SIZE_LARGE)
384         return d
385
386 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
387     def make_node(self, mode, num_servers=10):
388         self.node = FakeClient(mode, num_servers)
389         self.u = upload.Uploader()
390         self.u.running = True
391         self.u.parent = self.node
392
393     def _check_large(self, newuri, size):
394         u = uri.from_string(newuri)
395         self.failUnless(isinstance(u, uri.CHKFileURI))
396         self.failUnless(isinstance(u.get_storage_index(), str))
397         self.failUnlessEqual(len(u.get_storage_index()), 16)
398         self.failUnless(isinstance(u.key, str))
399         self.failUnlessEqual(len(u.key), 16)
400         self.failUnlessEqual(u.size, size)
401
402     def test_first_error(self):
403         mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
404         self.make_node(mode)
405         self.set_encoding_parameters(k=25, happy=1, n=50)
406         d = upload_data(self.u, DATA)
407         d.addCallback(extract_uri)
408         d.addCallback(self._check_large, SIZE_LARGE)
409         return d
410
411     def test_first_error_all(self):
412         self.make_node("first-fail")
413         d = self.shouldFail(UploadUnhappinessError, "first_error_all",
414                             "peer selection failed",
415                             upload_data, self.u, DATA)
416         def _check((f,)):
417             self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
418             # there should also be a 'last failure was' message
419             self.failUnlessIn("ServerError", str(f.value))
420         d.addCallback(_check)
421         return d
422
423     def test_second_error(self):
424         # we want to make sure we make it to a third pass. This means that
425         # the first pass was insufficient to place all shares, and at least
426         # one of second pass servers (other than the last one) accepted a
427         # share (so we'll believe that a third pass will be useful). (if
428         # everyone but the last server throws an error, then we'll send all
429         # the remaining shares to the last server at the end of the second
430         # pass, and if that succeeds, we won't make it to a third pass).
431         #
432         # we can achieve this 97.5% of the time by using 40 servers, having
433         # 39 of them fail on the second request, leaving only one to succeed
434         # on the second request. (we need to keep the number of servers low
435         # enough to ensure a second pass with 100 shares).
436         mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
437         self.make_node(mode, 40)
438         d = upload_data(self.u, DATA)
439         d.addCallback(extract_uri)
440         d.addCallback(self._check_large, SIZE_LARGE)
441         return d
442
443     def test_second_error_all(self):
444         self.make_node("second-fail")
445         d = self.shouldFail(UploadUnhappinessError, "second_error_all",
446                             "peer selection failed",
447                             upload_data, self.u, DATA)
448         def _check((f,)):
449             self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
450             # there should also be a 'last failure was' message
451             self.failUnlessIn("ServerError", str(f.value))
452         d.addCallback(_check)
453         return d
454
455 class FullServer(unittest.TestCase):
456     def setUp(self):
457         self.node = FakeClient(mode="full")
458         self.u = upload.Uploader()
459         self.u.running = True
460         self.u.parent = self.node
461
462     def _should_fail(self, f):
463         self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
464
465     def test_data_large(self):
466         data = DATA
467         d = upload_data(self.u, data)
468         d.addBoth(self._should_fail)
469         return d
470
471 class PeerSelection(unittest.TestCase):
472
473     def make_client(self, num_servers=50):
474         self.node = FakeClient(mode="good", num_servers=num_servers)
475         self.u = upload.Uploader()
476         self.u.running = True
477         self.u.parent = self.node
478
479     def get_data(self, size):
480         return DATA[:size]
481
482     def _check_large(self, newuri, size):
483         u = uri.from_string(newuri)
484         self.failUnless(isinstance(u, uri.CHKFileURI))
485         self.failUnless(isinstance(u.get_storage_index(), str))
486         self.failUnlessEqual(len(u.get_storage_index()), 16)
487         self.failUnless(isinstance(u.key, str))
488         self.failUnlessEqual(len(u.key), 16)
489         self.failUnlessEqual(u.size, size)
490
491     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
492         p = {"k": k,
493              "happy": happy,
494              "n": n,
495              "max_segment_size": max_segsize,
496              }
497         self.node.DEFAULT_ENCODING_PARAMETERS = p
498
499     def test_one_each(self):
500         # if we have 50 shares, and there are 50 peers, and they all accept a
501         # share, we should get exactly one share per peer
502
503         self.make_client()
504         data = self.get_data(SIZE_LARGE)
505         self.set_encoding_parameters(25, 30, 50)
506         d = upload_data(self.u, data)
507         d.addCallback(extract_uri)
508         d.addCallback(self._check_large, SIZE_LARGE)
509         def _check(res):
510             for p in self.node.last_peers:
511                 allocated = p.allocated
512                 self.failUnlessEqual(len(allocated), 1)
513                 self.failUnlessEqual(p.queries, 1)
514         d.addCallback(_check)
515         return d
516
517     def test_two_each(self):
518         # if we have 100 shares, and there are 50 peers, and they all accept
519         # all shares, we should get exactly two shares per peer
520
521         self.make_client()
522         data = self.get_data(SIZE_LARGE)
523         # if there are 50 peers, then happy needs to be <= 50
524         self.set_encoding_parameters(50, 50, 100)
525         d = upload_data(self.u, data)
526         d.addCallback(extract_uri)
527         d.addCallback(self._check_large, SIZE_LARGE)
528         def _check(res):
529             for p in self.node.last_peers:
530                 allocated = p.allocated
531                 self.failUnlessEqual(len(allocated), 2)
532                 self.failUnlessEqual(p.queries, 2)
533         d.addCallback(_check)
534         return d
535
536     def test_one_each_plus_one_extra(self):
537         # if we have 51 shares, and there are 50 peers, then one peer gets
538         # two shares and the rest get just one
539
540         self.make_client()
541         data = self.get_data(SIZE_LARGE)
542         self.set_encoding_parameters(24, 41, 51)
543         d = upload_data(self.u, data)
544         d.addCallback(extract_uri)
545         d.addCallback(self._check_large, SIZE_LARGE)
546         def _check(res):
547             got_one = []
548             got_two = []
549             for p in self.node.last_peers:
550                 allocated = p.allocated
551                 self.failUnless(len(allocated) in (1,2), len(allocated))
552                 if len(allocated) == 1:
553                     self.failUnlessEqual(p.queries, 1)
554                     got_one.append(p)
555                 else:
556                     self.failUnlessEqual(p.queries, 2)
557                     got_two.append(p)
558             self.failUnlessEqual(len(got_one), 49)
559             self.failUnlessEqual(len(got_two), 1)
560         d.addCallback(_check)
561         return d
562
563     def test_four_each(self):
564         # if we have 200 shares, and there are 50 peers, then each peer gets
565         # 4 shares. The design goal is to accomplish this with only two
566         # queries per peer.
567
568         self.make_client()
569         data = self.get_data(SIZE_LARGE)
570         # if there are 50 peers, then happy should be no more than 50 if
571         # we want this to work.
572         self.set_encoding_parameters(100, 50, 200)
573         d = upload_data(self.u, data)
574         d.addCallback(extract_uri)
575         d.addCallback(self._check_large, SIZE_LARGE)
576         def _check(res):
577             for p in self.node.last_peers:
578                 allocated = p.allocated
579                 self.failUnlessEqual(len(allocated), 4)
580                 self.failUnlessEqual(p.queries, 2)
581         d.addCallback(_check)
582         return d
583
584     def test_three_of_ten(self):
585         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
586         # 4+4+2
587
588         self.make_client(3)
589         data = self.get_data(SIZE_LARGE)
590         self.set_encoding_parameters(3, 3, 10)
591         d = upload_data(self.u, data)
592         d.addCallback(extract_uri)
593         d.addCallback(self._check_large, SIZE_LARGE)
594         def _check(res):
595             counts = {}
596             for p in self.node.last_peers:
597                 allocated = p.allocated
598                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
599             histogram = [counts.get(i, 0) for i in range(5)]
600             self.failUnlessEqual(histogram, [0,0,0,2,1])
601         d.addCallback(_check)
602         return d
603
604     def test_some_big_some_small(self):
605         # 10 shares, 20 servers, but half the servers don't support a
606         # share-size large enough for our file
607         mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
608         self.node = FakeClient(mode, num_servers=20)
609         self.u = upload.Uploader()
610         self.u.running = True
611         self.u.parent = self.node
612
613         data = self.get_data(SIZE_LARGE)
614         self.set_encoding_parameters(3, 5, 10)
615         d = upload_data(self.u, data)
616         d.addCallback(extract_uri)
617         d.addCallback(self._check_large, SIZE_LARGE)
618         def _check(res):
619             # we should have put one share each on the big peers, and zero
620             # shares on the small peers
621             total_allocated = 0
622             for p in self.node.last_peers:
623                 if p.mode == "good":
624                     self.failUnlessEqual(len(p.allocated), 1)
625                 elif p.mode == "small":
626                     self.failUnlessEqual(len(p.allocated), 0)
627                 total_allocated += len(p.allocated)
628             self.failUnlessEqual(total_allocated, 10)
629         d.addCallback(_check)
630         return d
631
632
633 class StorageIndex(unittest.TestCase):
634     def test_params_must_matter(self):
635         DATA = "I am some data"
636         u = upload.Data(DATA, convergence="")
637         eu = upload.EncryptAnUploadable(u)
638         d1 = eu.get_storage_index()
639
640         # CHK means the same data should encrypt the same way
641         u = upload.Data(DATA, convergence="")
642         eu = upload.EncryptAnUploadable(u)
643         d1a = eu.get_storage_index()
644
645         # but if we use a different convergence string it should be different
646         u = upload.Data(DATA, convergence="wheee!")
647         eu = upload.EncryptAnUploadable(u)
648         d1salt1 = eu.get_storage_index()
649
650         # and if we add yet a different convergence it should be different again
651         u = upload.Data(DATA, convergence="NOT wheee!")
652         eu = upload.EncryptAnUploadable(u)
653         d1salt2 = eu.get_storage_index()
654
655         # and if we use the first string again it should be the same as last time
656         u = upload.Data(DATA, convergence="wheee!")
657         eu = upload.EncryptAnUploadable(u)
658         d1salt1a = eu.get_storage_index()
659
660         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
661         u = upload.Data(DATA, convergence="")
662         u.encoding_param_k = u.default_encoding_param_k + 1
663         eu = upload.EncryptAnUploadable(u)
664         d2 = eu.get_storage_index()
665
666         # and if we use a random key, it should be different than the CHK
667         u = upload.Data(DATA, convergence=None)
668         eu = upload.EncryptAnUploadable(u)
669         d3 = eu.get_storage_index()
670         # and different from another instance
671         u = upload.Data(DATA, convergence=None)
672         eu = upload.EncryptAnUploadable(u)
673         d4 = eu.get_storage_index()
674
675         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
676         def _done(res):
677             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
678             self.failUnlessEqual(si1, si1a)
679             self.failIfEqual(si1, si2)
680             self.failIfEqual(si1, si3)
681             self.failIfEqual(si1, si4)
682             self.failIfEqual(si3, si4)
683             self.failIfEqual(si1salt1, si1)
684             self.failIfEqual(si1salt1, si1salt2)
685             self.failIfEqual(si1salt2, si1)
686             self.failUnlessEqual(si1salt1, si1salt1a)
687         d.addCallback(_done)
688         return d
689
690 # copied from python docs because itertools.combinations was added in
691 # python 2.6 and we support >= 2.4.
692 def combinations(iterable, r):
693     # combinations('ABCD', 2) --> AB AC AD BC BD CD
694     # combinations(range(4), 3) --> 012 013 023 123
695     pool = tuple(iterable)
696     n = len(pool)
697     if r > n:
698         return
699     indices = range(r)
700     yield tuple(pool[i] for i in indices)
701     while True:
702         for i in reversed(range(r)):
703             if indices[i] != i + n - r:
704                 break
705         else:
706             return
707         indices[i] += 1
708         for j in range(i+1, r):
709             indices[j] = indices[j-1] + 1
710         yield tuple(pool[i] for i in indices)
711
712 def is_happy_enough(servertoshnums, h, k):
713     """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """
714     if len(servertoshnums) < h:
715         return False
716     # print "servertoshnums: ", servertoshnums, h, k
717     for happysetcombo in combinations(servertoshnums.iterkeys(), h):
718         # print "happysetcombo: ", happysetcombo
719         for subsetcombo in combinations(happysetcombo, k):
720             shnums = reduce(set.union, [ servertoshnums[s] for s in subsetcombo ])
721             # print "subsetcombo: ", subsetcombo, ", shnums: ", shnums
722             if len(shnums) < k:
723                 # print "NOT HAAPP{Y", shnums, k
724                 return False
725     # print "HAAPP{Y"
726     return True
727
728 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
729     ShouldFailMixin):
730     def find_all_shares(self, unused=None):
731         """Locate shares on disk. Returns a dict that maps
732         server to set of sharenums.
733         """
734         assert self.g, "I tried to find a grid at self.g, but failed"
735         servertoshnums = {} # k: server, v: set(shnum)
736
737         for i, c in self.g.servers_by_number.iteritems():
738             for (dirp, dirns, fns) in os.walk(c.sharedir):
739                 for fn in fns:
740                     try:
741                         sharenum = int(fn)
742                     except TypeError:
743                         # Whoops, I guess that's not a share file then.
744                         pass
745                     else:
746                         servertoshnums.setdefault(i, set()).add(sharenum)
747
748         return servertoshnums
749
750     def _do_upload_with_broken_servers(self, servers_to_break):
751         """
752         I act like a normal upload, but before I send the results of
753         Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
754         PeerTrackers in the upload_servers part of the return result.
755         """
756         assert self.g, "I tried to find a grid at self.g, but failed"
757         broker = self.g.clients[0].storage_broker
758         sh     = self.g.clients[0]._secret_holder
759         data = upload.Data("data" * 10000, convergence="")
760         data.encoding_param_k = 3
761         data.encoding_param_happy = 4
762         data.encoding_param_n = 10
763         uploadable = upload.EncryptAnUploadable(data)
764         encoder = encode.Encoder()
765         encoder.set_encrypted_uploadable(uploadable)
766         status = upload.UploadStatus()
767         selector = upload.Tahoe2PeerSelector("dglev", "test", status)
768         storage_index = encoder.get_param("storage_index")
769         share_size = encoder.get_param("share_size")
770         block_size = encoder.get_param("block_size")
771         num_segments = encoder.get_param("num_segments")
772         d = selector.get_shareholders(broker, sh, storage_index,
773                                       share_size, block_size, num_segments,
774                                       10, 3, 4)
775         def _have_shareholders((upload_servers, already_peers)):
776             assert servers_to_break <= len(upload_servers)
777             for index in xrange(servers_to_break):
778                 server = list(upload_servers)[index]
779                 for share in server.buckets.keys():
780                     server.buckets[share].abort()
781             buckets = {}
782             servermap = already_peers.copy()
783             for peer in upload_servers:
784                 buckets.update(peer.buckets)
785                 for bucket in peer.buckets:
786                     servermap.setdefault(bucket, set()).add(peer.peerid)
787             encoder.set_shareholders(buckets, servermap)
788             d = encoder.start()
789             return d
790         d.addCallback(_have_shareholders)
791         return d
792
793     def _has_happy_share_distribution(self):
794         servertoshnums = self.find_all_shares()
795         k = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['k']
796         h = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['happy']
797         return is_happy_enough(servertoshnums, h, k)
798
799     def _add_server(self, server_number, readonly=False):
800         assert self.g, "I tried to find a grid at self.g, but failed"
801         ss = self.g.make_server(server_number, readonly)
802         log.msg("just created a server, number: %s => %s" % (server_number, ss,))
803         self.g.add_server(server_number, ss)
804
805     def _add_server_with_share(self, server_number, share_number=None,
806                                readonly=False):
807         self._add_server(server_number, readonly)
808         if share_number is not None:
809             self._copy_share_to_server(share_number, server_number)
810
811
812     def _copy_share_to_server(self, share_number, server_number):
813         ss = self.g.servers_by_number[server_number]
814         # Copy share i from the directory associated with the first
815         # storage server to the directory associated with this one.
816         assert self.g, "I tried to find a grid at self.g, but failed"
817         assert self.shares, "I tried to find shares at self.shares, but failed"
818         old_share_location = self.shares[share_number][2]
819         new_share_location = os.path.join(ss.storedir, "shares")
820         si = uri.from_string(self.uri).get_storage_index()
821         new_share_location = os.path.join(new_share_location,
822                                           storage_index_to_dir(si))
823         if not os.path.exists(new_share_location):
824             os.makedirs(new_share_location)
825         new_share_location = os.path.join(new_share_location,
826                                           str(share_number))
827         if old_share_location != new_share_location:
828             shutil.copy(old_share_location, new_share_location)
829         shares = self.find_uri_shares(self.uri)
830         # Make sure that the storage server has the share.
831         self.failUnless((share_number, ss.my_nodeid, new_share_location)
832                         in shares)
833
834     def _setup_grid(self):
835         """
836         I set up a NoNetworkGrid with a single server and client.
837         """
838         self.set_up_grid(num_clients=1, num_servers=1)
839
840     def _setup_and_upload(self, **kwargs):
841         """
842         I set up a NoNetworkGrid with a single server and client,
843         upload a file to it, store its uri in self.uri, and store its
844         sharedata in self.shares.
845         """
846         self._setup_grid()
847         client = self.g.clients[0]
848         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
849         if "n" in kwargs and "k" in kwargs:
850             client.DEFAULT_ENCODING_PARAMETERS['k'] = kwargs['k']
851             client.DEFAULT_ENCODING_PARAMETERS['n'] = kwargs['n']
852         data = upload.Data("data" * 10000, convergence="")
853         self.data = data
854         d = client.upload(data)
855         def _store_uri(ur):
856             self.uri = ur.uri
857         d.addCallback(_store_uri)
858         d.addCallback(lambda ign:
859             self.find_uri_shares(self.uri))
860         def _store_shares(shares):
861             self.shares = shares
862         d.addCallback(_store_shares)
863         return d
864
865     def test_configure_parameters(self):
866         self.basedir = self.mktemp()
867         hooks = {0: self._set_up_nodes_extra_config}
868         self.set_up_grid(client_config_hooks=hooks)
869         c0 = self.g.clients[0]
870
871         DATA = "data" * 100
872         u = upload.Data(DATA, convergence="")
873         d = c0.upload(u)
874         d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
875         m = monitor.Monitor()
876         d.addCallback(lambda fn: fn.check(m))
877         def _check(cr):
878             data = cr.get_data()
879             self.failUnlessEqual(data["count-shares-needed"], 7)
880             self.failUnlessEqual(data["count-shares-expected"], 12)
881         d.addCallback(_check)
882         return d
883
884
885     def _setUp(self, ns):
886         # Used by test_happy_semantics and test_preexisting_share_behavior
887         # to set up the grid.
888         self.node = FakeClient(mode="good", num_servers=ns)
889         self.u = upload.Uploader()
890         self.u.running = True
891         self.u.parent = self.node
892
893
894     def test_happy_semantics(self):
895         self._setUp(2)
896         DATA = upload.Data("kittens" * 10000, convergence="")
897         # These parameters are unsatisfiable with only 2 servers.
898         self.set_encoding_parameters(k=3, happy=5, n=10)
899         d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
900                             "shares could be placed or found on only 2 "
901                             "server(s). We were asked to place shares on "
902                             "at least 5 server(s) such that any 3 of them "
903                             "have enough shares to recover the file",
904                             self.u.upload, DATA)
905         # Let's reset the client to have 10 servers
906         d.addCallback(lambda ign:
907             self._setUp(10))
908         # These parameters are satisfiable with 10 servers.
909         d.addCallback(lambda ign:
910             self.set_encoding_parameters(k=3, happy=5, n=10))
911         d.addCallback(lambda ign:
912             self.u.upload(DATA))
913         # Let's reset the client to have 7 servers
914         # (this is less than n, but more than h)
915         d.addCallback(lambda ign:
916             self._setUp(7))
917         # These parameters are satisfiable with 7 servers.
918         d.addCallback(lambda ign:
919             self.set_encoding_parameters(k=3, happy=5, n=10))
920         d.addCallback(lambda ign:
921             self.u.upload(DATA))
922         return d
923
924     def test_aborted_shares(self):
925         self.basedir = "upload/EncodingParameters/aborted_shares"
926         self.set_up_grid(num_servers=4)
927         c = self.g.clients[0]
928         DATA = upload.Data(100* "kittens", convergence="")
929         # These parameters are unsatisfiable with only 4 servers, but should
930         # work with 5, as long as the original 4 are not stuck in the open
931         # BucketWriter state (open() but not
932         parms = {"k":2, "happy":5, "n":5, "max_segment_size": 1*MiB}
933         c.DEFAULT_ENCODING_PARAMETERS = parms
934         d = self.shouldFail(UploadUnhappinessError, "test_aborted_shares",
935                             "shares could be placed on only 4 "
936                             "server(s) such that any 2 of them have enough "
937                             "shares to recover the file, but we were asked "
938                             "to place shares on at least 5 such servers",
939                             c.upload, DATA)
940         # now add the 5th server
941         d.addCallback(lambda ign: self._add_server(4, False))
942         # and this time the upload ought to succeed
943         d.addCallback(lambda ign: c.upload(DATA))
944         d.addCallback(lambda ign:
945             self.failUnless(self._has_happy_share_distribution()))
946         return d
947
948
949     def test_problem_layout_comment_52(self):
950         def _basedir():
951             self.basedir = self.mktemp()
952         _basedir()
953         # This scenario is at
954         # http://allmydata.org/trac/tahoe/ticket/778#comment:52
955         #
956         # The scenario in comment:52 proposes that we have a layout
957         # like:
958         # server 0: shares 1 - 9
959         # server 1: share 0, read-only
960         # server 2: share 0, read-only
961         # server 3: share 0, read-only
962         # To get access to the shares, we will first upload to one
963         # server, which will then have shares 0 - 9. We'll then
964         # add three new servers, configure them to not accept any new
965         # shares, then write share 0 directly into the serverdir of each,
966         # and then remove share 0 from server 0 in the same way.
967         # Then each of servers 1 - 3 will report that they have share 0,
968         # and will not accept any new share, while server 0 will report that
969         # it has shares 1 - 9 and will accept new shares.
970         # We'll then set 'happy' = 4, and see that an upload fails
971         # (as it should)
972         d = self._setup_and_upload()
973         d.addCallback(lambda ign:
974             self._add_server_with_share(server_number=1, share_number=0,
975                                         readonly=True))
976         d.addCallback(lambda ign:
977             self._add_server_with_share(server_number=2, share_number=0,
978                                         readonly=True))
979         d.addCallback(lambda ign:
980             self._add_server_with_share(server_number=3, share_number=0,
981                                         readonly=True))
982         # Remove the first share from server 0.
983         def _remove_share_0_from_server_0():
984             share_location = self.shares[0][2]
985             os.remove(share_location)
986         d.addCallback(lambda ign:
987             _remove_share_0_from_server_0())
988         # Set happy = 4 in the client.
989         def _prepare():
990             client = self.g.clients[0]
991             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
992             return client
993         d.addCallback(lambda ign:
994             _prepare())
995         # Uploading data should fail
996         d.addCallback(lambda client:
997             self.shouldFail(UploadUnhappinessError,
998                             "test_problem_layout_comment_52_test_1",
999                             "shares could be placed or found on 4 server(s), "
1000                             "but they are not spread out evenly enough to "
1001                             "ensure that any 3 of these servers would have "
1002                             "enough shares to recover the file. "
1003                             "We were asked to place shares on at "
1004                             "least 4 servers such that any 3 of them have "
1005                             "enough shares to recover the file",
1006                             client.upload, upload.Data("data" * 10000,
1007                                                        convergence="")))
1008
1009         # Do comment:52, but like this:
1010         # server 2: empty
1011         # server 3: share 0, read-only
1012         # server 1: share 0, read-only
1013         # server 0: shares 0-9
1014         d.addCallback(lambda ign:
1015             _basedir())
1016         d.addCallback(lambda ign:
1017             self._setup_and_upload())
1018         d.addCallback(lambda ign:
1019             self._add_server(server_number=2))
1020         d.addCallback(lambda ign:
1021             self._add_server_with_share(server_number=3, share_number=0,
1022                                         readonly=True))
1023         d.addCallback(lambda ign:
1024             self._add_server_with_share(server_number=1, share_number=0,
1025                                         readonly=True))
1026         def _prepare2():
1027             client = self.g.clients[0]
1028             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1029             return client
1030         d.addCallback(lambda ign:
1031             _prepare2())
1032         d.addCallback(lambda client:
1033             self.shouldFail(UploadUnhappinessError,
1034                             "test_problem_layout_comment_52_test_2",
1035                             "shares could be placed on only 3 server(s) such "
1036                             "that any 3 of them have enough shares to recover "
1037                             "the file, but we were asked to place shares on "
1038                             "at least 4 such servers.",
1039                             client.upload, upload.Data("data" * 10000,
1040                                                        convergence="")))
1041         return d
1042
1043
1044     def test_problem_layout_comment_53(self):
1045         # This scenario is at
1046         # http://allmydata.org/trac/tahoe/ticket/778#comment:53
1047         #
1048         # Set up the grid to have one server
1049         def _change_basedir(ign):
1050             self.basedir = self.mktemp()
1051         _change_basedir(None)
1052         # We start by uploading all of the shares to one server.
1053         # Next, we'll add three new servers to our NoNetworkGrid. We'll add
1054         # one share from our initial upload to each of these.
1055         # The counterintuitive ordering of the share numbers is to deal with
1056         # the permuting of these servers -- distributing the shares this
1057         # way ensures that the Tahoe2PeerSelector sees them in the order
1058         # described below.
1059         d = self._setup_and_upload()
1060         d.addCallback(lambda ign:
1061             self._add_server_with_share(server_number=1, share_number=2))
1062         d.addCallback(lambda ign:
1063             self._add_server_with_share(server_number=2, share_number=0))
1064         d.addCallback(lambda ign:
1065             self._add_server_with_share(server_number=3, share_number=1))
1066         # So, we now have the following layout:
1067         # server 0: shares 0 - 9
1068         # server 1: share 2
1069         # server 2: share 0
1070         # server 3: share 1
1071         # We change the 'happy' parameter in the client to 4.
1072         # The Tahoe2PeerSelector will see the peers permuted as:
1073         # 2, 3, 1, 0
1074         # Ideally, a reupload of our original data should work.
1075         def _reset_encoding_parameters(ign, happy=4):
1076             client = self.g.clients[0]
1077             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1078             return client
1079         d.addCallback(_reset_encoding_parameters)
1080         d.addCallback(lambda client:
1081             client.upload(upload.Data("data" * 10000, convergence="")))
1082         d.addCallback(lambda ign:
1083             self.failUnless(self._has_happy_share_distribution()))
1084
1085
1086         # This scenario is basically comment:53, but changed so that the
1087         # Tahoe2PeerSelector sees the server with all of the shares before
1088         # any of the other servers.
1089         # The layout is:
1090         # server 2: shares 0 - 9
1091         # server 3: share 0
1092         # server 1: share 1
1093         # server 4: share 2
1094         # The Tahoe2PeerSelector sees the peers permuted as:
1095         # 2, 3, 1, 4
1096         # Note that server 0 has been replaced by server 4; this makes it
1097         # easier to ensure that the last server seen by Tahoe2PeerSelector
1098         # has only one share.
1099         d.addCallback(_change_basedir)
1100         d.addCallback(lambda ign:
1101             self._setup_and_upload())
1102         d.addCallback(lambda ign:
1103             self._add_server_with_share(server_number=2, share_number=0))
1104         d.addCallback(lambda ign:
1105             self._add_server_with_share(server_number=3, share_number=1))
1106         d.addCallback(lambda ign:
1107             self._add_server_with_share(server_number=1, share_number=2))
1108         # Copy all of the other shares to server number 2
1109         def _copy_shares(ign):
1110             for i in xrange(0, 10):
1111                 self._copy_share_to_server(i, 2)
1112         d.addCallback(_copy_shares)
1113         # Remove the first server, and add a placeholder with share 0
1114         d.addCallback(lambda ign:
1115             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1116         d.addCallback(lambda ign:
1117             self._add_server_with_share(server_number=4, share_number=0))
1118         # Now try uploading.
1119         d.addCallback(_reset_encoding_parameters)
1120         d.addCallback(lambda client:
1121             client.upload(upload.Data("data" * 10000, convergence="")))
1122         d.addCallback(lambda ign:
1123             self.failUnless(self._has_happy_share_distribution()))
1124
1125
1126         # Try the same thing, but with empty servers after the first one
1127         # We want to make sure that Tahoe2PeerSelector will redistribute
1128         # shares as necessary, not simply discover an existing layout.
1129         # The layout is:
1130         # server 2: shares 0 - 9
1131         # server 3: empty
1132         # server 1: empty
1133         # server 4: empty
1134         d.addCallback(_change_basedir)
1135         d.addCallback(lambda ign:
1136             self._setup_and_upload())
1137         d.addCallback(lambda ign:
1138             self._add_server(server_number=2))
1139         d.addCallback(lambda ign:
1140             self._add_server(server_number=3))
1141         d.addCallback(lambda ign:
1142             self._add_server(server_number=1))
1143         d.addCallback(lambda ign:
1144             self._add_server(server_number=4))
1145         d.addCallback(_copy_shares)
1146         d.addCallback(lambda ign:
1147             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1148         d.addCallback(_reset_encoding_parameters)
1149         d.addCallback(lambda client:
1150             client.upload(upload.Data("data" * 10000, convergence="")))
1151         # Make sure that only as many shares as necessary to satisfy
1152         # servers of happiness were pushed.
1153         d.addCallback(lambda results:
1154             self.failUnlessEqual(results.pushed_shares, 3))
1155         d.addCallback(lambda ign:
1156             self.failUnless(self._has_happy_share_distribution()))
1157         return d
1158
1159     def test_problem_layout_ticket_1124(self):
1160         self.basedir = self.mktemp()
1161         d = self._setup_and_upload(k=2, n=4)
1162
1163         # server 0: shares 0, 1, 2, 3
1164         # server 1: shares 0, 3
1165         # server 2: share 1
1166         # server 3: share 2
1167         # With this layout, an upload should just be satisfied that the current distribution is good enough, right?
1168         def _setup(ign):
1169             self._add_server_with_share(server_number=0, share_number=None)
1170             self._add_server_with_share(server_number=1, share_number=0)
1171             self._add_server_with_share(server_number=2, share_number=1)
1172             self._add_server_with_share(server_number=3, share_number=2)
1173             # Copy shares
1174             self._copy_share_to_server(3, 1)
1175             client = self.g.clients[0]
1176             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1177             return client
1178
1179         d.addCallback(_setup)
1180         d.addCallback(lambda client:
1181             client.upload(upload.Data("data" * 10000, convergence="")))
1182         d.addCallback(lambda ign:
1183             self.failUnless(self._has_happy_share_distribution()))
1184         return d
1185     test_problem_layout_ticket_1124.todo = "Fix this after 1.7.1 release."
1186
1187     def test_happiness_with_some_readonly_peers(self):
1188         # Try the following layout
1189         # server 2: shares 0-9
1190         # server 4: share 0, read-only
1191         # server 3: share 1, read-only
1192         # server 1: share 2, read-only
1193         self.basedir = self.mktemp()
1194         d = self._setup_and_upload()
1195         d.addCallback(lambda ign:
1196             self._add_server_with_share(server_number=2, share_number=0))
1197         d.addCallback(lambda ign:
1198             self._add_server_with_share(server_number=3, share_number=1,
1199                                         readonly=True))
1200         d.addCallback(lambda ign:
1201             self._add_server_with_share(server_number=1, share_number=2,
1202                                         readonly=True))
1203         # Copy all of the other shares to server number 2
1204         def _copy_shares(ign):
1205             for i in xrange(1, 10):
1206                 self._copy_share_to_server(i, 2)
1207         d.addCallback(_copy_shares)
1208         # Remove server 0, and add another in its place
1209         d.addCallback(lambda ign:
1210             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1211         d.addCallback(lambda ign:
1212             self._add_server_with_share(server_number=4, share_number=0,
1213                                         readonly=True))
1214         def _reset_encoding_parameters(ign, happy=4):
1215             client = self.g.clients[0]
1216             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1217             return client
1218         d.addCallback(_reset_encoding_parameters)
1219         d.addCallback(lambda client:
1220             client.upload(upload.Data("data" * 10000, convergence="")))
1221         d.addCallback(lambda ign:
1222             self.failUnless(self._has_happy_share_distribution()))
1223         return d
1224
1225
1226     def test_happiness_with_all_readonly_peers(self):
1227         # server 3: share 1, read-only
1228         # server 1: share 2, read-only
1229         # server 2: shares 0-9, read-only
1230         # server 4: share 0, read-only
1231         # The idea with this test is to make sure that the survey of
1232         # read-only peers doesn't undercount servers of happiness
1233         self.basedir = self.mktemp()
1234         d = self._setup_and_upload()
1235         d.addCallback(lambda ign:
1236             self._add_server_with_share(server_number=4, share_number=0,
1237                                         readonly=True))
1238         d.addCallback(lambda ign:
1239             self._add_server_with_share(server_number=3, share_number=1,
1240                                         readonly=True))
1241         d.addCallback(lambda ign:
1242             self._add_server_with_share(server_number=1, share_number=2,
1243                                         readonly=True))
1244         d.addCallback(lambda ign:
1245             self._add_server_with_share(server_number=2, share_number=0,
1246                                         readonly=True))
1247         def _copy_shares(ign):
1248             for i in xrange(1, 10):
1249                 self._copy_share_to_server(i, 2)
1250         d.addCallback(_copy_shares)
1251         d.addCallback(lambda ign:
1252             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1253         def _reset_encoding_parameters(ign, happy=4):
1254             client = self.g.clients[0]
1255             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1256             return client
1257         d.addCallback(_reset_encoding_parameters)
1258         d.addCallback(lambda client:
1259             client.upload(upload.Data("data" * 10000, convergence="")))
1260         d.addCallback(lambda ign:
1261             self.failUnless(self._has_happy_share_distribution()))
1262         return d
1263
1264
1265     def test_dropped_servers_in_encoder(self):
1266         # The Encoder does its own "servers_of_happiness" check if it
1267         # happens to lose a bucket during an upload (it assumes that
1268         # the layout presented to it satisfies "servers_of_happiness"
1269         # until a failure occurs)
1270         #
1271         # This test simulates an upload where servers break after peer
1272         # selection, but before they are written to.
1273         def _set_basedir(ign=None):
1274             self.basedir = self.mktemp()
1275         _set_basedir()
1276         d = self._setup_and_upload();
1277         # Add 5 servers
1278         def _do_server_setup(ign):
1279             self._add_server(server_number=1)
1280             self._add_server(server_number=2)
1281             self._add_server(server_number=3)
1282             self._add_server(server_number=4)
1283             self._add_server(server_number=5)
1284         d.addCallback(_do_server_setup)
1285         # remove the original server
1286         # (necessary to ensure that the Tahoe2PeerSelector will distribute
1287         #  all the shares)
1288         def _remove_server(ign):
1289             server = self.g.servers_by_number[0]
1290             self.g.remove_server(server.my_nodeid)
1291         d.addCallback(_remove_server)
1292         # This should succeed; we still have 4 servers, and the
1293         # happiness of the upload is 4.
1294         d.addCallback(lambda ign:
1295             self._do_upload_with_broken_servers(1))
1296         # Now, do the same thing over again, but drop 2 servers instead
1297         # of 1. This should fail, because servers_of_happiness is 4 and
1298         # we can't satisfy that.
1299         d.addCallback(_set_basedir)
1300         d.addCallback(lambda ign:
1301             self._setup_and_upload())
1302         d.addCallback(_do_server_setup)
1303         d.addCallback(_remove_server)
1304         d.addCallback(lambda ign:
1305             self.shouldFail(UploadUnhappinessError,
1306                             "test_dropped_servers_in_encoder",
1307                             "shares could be placed on only 3 server(s) "
1308                             "such that any 3 of them have enough shares to "
1309                             "recover the file, but we were asked to place "
1310                             "shares on at least 4",
1311                             self._do_upload_with_broken_servers, 2))
1312         # Now do the same thing over again, but make some of the servers
1313         # readonly, break some of the ones that aren't, and make sure that
1314         # happiness accounting is preserved.
1315         d.addCallback(_set_basedir)
1316         d.addCallback(lambda ign:
1317             self._setup_and_upload())
1318         def _do_server_setup_2(ign):
1319             self._add_server(1)
1320             self._add_server(2)
1321             self._add_server(3)
1322             self._add_server_with_share(4, 7, readonly=True)
1323             self._add_server_with_share(5, 8, readonly=True)
1324         d.addCallback(_do_server_setup_2)
1325         d.addCallback(_remove_server)
1326         d.addCallback(lambda ign:
1327             self._do_upload_with_broken_servers(1))
1328         d.addCallback(_set_basedir)
1329         d.addCallback(lambda ign:
1330             self._setup_and_upload())
1331         d.addCallback(_do_server_setup_2)
1332         d.addCallback(_remove_server)
1333         d.addCallback(lambda ign:
1334             self.shouldFail(UploadUnhappinessError,
1335                             "test_dropped_servers_in_encoder",
1336                             "shares could be placed on only 3 server(s) "
1337                             "such that any 3 of them have enough shares to "
1338                             "recover the file, but we were asked to place "
1339                             "shares on at least 4",
1340                             self._do_upload_with_broken_servers, 2))
1341         return d
1342
1343
1344     def test_merge_peers(self):
1345         # merge_peers merges a list of upload_servers and a dict of
1346         # shareid -> peerid mappings.
1347         shares = {
1348                     1 : set(["server1"]),
1349                     2 : set(["server2"]),
1350                     3 : set(["server3"]),
1351                     4 : set(["server4", "server5"]),
1352                     5 : set(["server1", "server2"]),
1353                  }
1354         # if not provided with a upload_servers argument, it should just
1355         # return the first argument unchanged.
1356         self.failUnlessEqual(shares, merge_peers(shares, set([])))
1357         class FakePeerTracker:
1358             pass
1359         trackers = []
1360         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1361             t = FakePeerTracker()
1362             t.peerid = server
1363             t.buckets = [i]
1364             trackers.append(t)
1365         expected = {
1366                     1 : set(["server1"]),
1367                     2 : set(["server2"]),
1368                     3 : set(["server3"]),
1369                     4 : set(["server4", "server5"]),
1370                     5 : set(["server1", "server2", "server5"]),
1371                     6 : set(["server6"]),
1372                     7 : set(["server7"]),
1373                     8 : set(["server8"]),
1374                    }
1375         self.failUnlessEqual(expected, merge_peers(shares, set(trackers)))
1376         shares2 = {}
1377         expected = {
1378                     5 : set(["server5"]),
1379                     6 : set(["server6"]),
1380                     7 : set(["server7"]),
1381                     8 : set(["server8"]),
1382                    }
1383         self.failUnlessEqual(expected, merge_peers(shares2, set(trackers)))
1384         shares3 = {}
1385         trackers = []
1386         expected = {}
1387         for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1388             shares3[i] = set([server])
1389             t = FakePeerTracker()
1390             t.peerid = server
1391             t.buckets = [i]
1392             trackers.append(t)
1393             expected[i] = set([server])
1394         self.failUnlessEqual(expected, merge_peers(shares3, set(trackers)))
1395
1396
1397     def test_servers_of_happiness_utility_function(self):
1398         # These tests are concerned with the servers_of_happiness()
1399         # utility function, and its underlying matching algorithm. Other
1400         # aspects of the servers_of_happiness behavior are tested
1401         # elsehwere These tests exist to ensure that
1402         # servers_of_happiness doesn't under or overcount the happiness
1403         # value for given inputs.
1404
1405         # servers_of_happiness expects a dict of
1406         # shnum => set(peerids) as a preexisting shares argument.
1407         test1 = {
1408                  1 : set(["server1"]),
1409                  2 : set(["server2"]),
1410                  3 : set(["server3"]),
1411                  4 : set(["server4"])
1412                 }
1413         happy = servers_of_happiness(test1)
1414         self.failUnlessEqual(4, happy)
1415         test1[4] = set(["server1"])
1416         # We've added a duplicate server, so now servers_of_happiness
1417         # should be 3 instead of 4.
1418         happy = servers_of_happiness(test1)
1419         self.failUnlessEqual(3, happy)
1420         # The second argument of merge_peers should be a set of
1421         # objects with peerid and buckets as attributes. In actual use,
1422         # these will be PeerTracker instances, but for testing it is fine
1423         # to make a FakePeerTracker whose job is to hold those instance
1424         # variables to test that part.
1425         class FakePeerTracker:
1426             pass
1427         trackers = []
1428         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1429             t = FakePeerTracker()
1430             t.peerid = server
1431             t.buckets = [i]
1432             trackers.append(t)
1433         # Recall that test1 is a server layout with servers_of_happiness
1434         # = 3.  Since there isn't any overlap between the shnum ->
1435         # set([peerid]) correspondences in test1 and those in trackers,
1436         # the result here should be 7.
1437         test2 = merge_peers(test1, set(trackers))
1438         happy = servers_of_happiness(test2)
1439         self.failUnlessEqual(7, happy)
1440         # Now add an overlapping server to trackers. This is redundant,
1441         # so it should not cause the previously reported happiness value
1442         # to change.
1443         t = FakePeerTracker()
1444         t.peerid = "server1"
1445         t.buckets = [1]
1446         trackers.append(t)
1447         test2 = merge_peers(test1, set(trackers))
1448         happy = servers_of_happiness(test2)
1449         self.failUnlessEqual(7, happy)
1450         test = {}
1451         happy = servers_of_happiness(test)
1452         self.failUnlessEqual(0, happy)
1453         # Test a more substantial overlap between the trackers and the
1454         # existing assignments.
1455         test = {
1456             1 : set(['server1']),
1457             2 : set(['server2']),
1458             3 : set(['server3']),
1459             4 : set(['server4']),
1460         }
1461         trackers = []
1462         t = FakePeerTracker()
1463         t.peerid = 'server5'
1464         t.buckets = [4]
1465         trackers.append(t)
1466         t = FakePeerTracker()
1467         t.peerid = 'server6'
1468         t.buckets = [3, 5]
1469         trackers.append(t)
1470         # The value returned by servers_of_happiness is the size
1471         # of a maximum matching in the bipartite graph that
1472         # servers_of_happiness() makes between peerids and share
1473         # numbers. It should find something like this:
1474         # (server 1, share 1)
1475         # (server 2, share 2)
1476         # (server 3, share 3)
1477         # (server 5, share 4)
1478         # (server 6, share 5)
1479         #
1480         # and, since there are 5 edges in this matching, it should
1481         # return 5.
1482         test2 = merge_peers(test, set(trackers))
1483         happy = servers_of_happiness(test2)
1484         self.failUnlessEqual(5, happy)
1485         # Zooko's first puzzle:
1486         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1487         #
1488         # server 1: shares 0, 1
1489         # server 2: shares 1, 2
1490         # server 3: share 2
1491         #
1492         # This should yield happiness of 3.
1493         test = {
1494             0 : set(['server1']),
1495             1 : set(['server1', 'server2']),
1496             2 : set(['server2', 'server3']),
1497         }
1498         self.failUnlessEqual(3, servers_of_happiness(test))
1499         # Zooko's second puzzle:
1500         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1501         #
1502         # server 1: shares 0, 1
1503         # server 2: share 1
1504         #
1505         # This should yield happiness of 2.
1506         test = {
1507             0 : set(['server1']),
1508             1 : set(['server1', 'server2']),
1509         }
1510         self.failUnlessEqual(2, servers_of_happiness(test))
1511
1512
1513     def test_shares_by_server(self):
1514         test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1515         sbs = shares_by_server(test)
1516         self.failUnlessEqual(set([1]), sbs["server1"])
1517         self.failUnlessEqual(set([2]), sbs["server2"])
1518         self.failUnlessEqual(set([3]), sbs["server3"])
1519         self.failUnlessEqual(set([4]), sbs["server4"])
1520         test1 = {
1521                     1 : set(["server1"]),
1522                     2 : set(["server1"]),
1523                     3 : set(["server1"]),
1524                     4 : set(["server2"]),
1525                     5 : set(["server2"])
1526                 }
1527         sbs = shares_by_server(test1)
1528         self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1529         self.failUnlessEqual(set([4, 5]), sbs["server2"])
1530         # This should fail unless the peerid part of the mapping is a set
1531         test2 = {1: "server1"}
1532         self.shouldFail(AssertionError,
1533                        "test_shares_by_server",
1534                        "",
1535                        shares_by_server, test2)
1536
1537
1538     def test_existing_share_detection(self):
1539         self.basedir = self.mktemp()
1540         d = self._setup_and_upload()
1541         # Our final setup should look like this:
1542         # server 1: shares 0 - 9, read-only
1543         # server 2: empty
1544         # server 3: empty
1545         # server 4: empty
1546         # The purpose of this test is to make sure that the peer selector
1547         # knows about the shares on server 1, even though it is read-only.
1548         # It used to simply filter these out, which would cause the test
1549         # to fail when servers_of_happiness = 4.
1550         d.addCallback(lambda ign:
1551             self._add_server_with_share(1, 0, True))
1552         d.addCallback(lambda ign:
1553             self._add_server(2))
1554         d.addCallback(lambda ign:
1555             self._add_server(3))
1556         d.addCallback(lambda ign:
1557             self._add_server(4))
1558         def _copy_shares(ign):
1559             for i in xrange(1, 10):
1560                 self._copy_share_to_server(i, 1)
1561         d.addCallback(_copy_shares)
1562         d.addCallback(lambda ign:
1563             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1564         def _prepare_client(ign):
1565             client = self.g.clients[0]
1566             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1567             return client
1568         d.addCallback(_prepare_client)
1569         d.addCallback(lambda client:
1570             client.upload(upload.Data("data" * 10000, convergence="")))
1571         d.addCallback(lambda ign:
1572             self.failUnless(self._has_happy_share_distribution()))
1573         return d
1574
1575
1576     def test_query_counting(self):
1577         # If peer selection fails, Tahoe2PeerSelector prints out a lot
1578         # of helpful diagnostic information, including query stats.
1579         # This test helps make sure that that information is accurate.
1580         self.basedir = self.mktemp()
1581         d = self._setup_and_upload()
1582         def _setup(ign):
1583             for i in xrange(1, 11):
1584                 self._add_server(server_number=i)
1585             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1586             c = self.g.clients[0]
1587             # We set happy to an unsatisfiable value so that we can check the
1588             # counting in the exception message. The same progress message
1589             # is also used when the upload is successful, but in that case it
1590             # only gets written to a log, so we can't see what it says.
1591             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1592             return c
1593         d.addCallback(_setup)
1594         d.addCallback(lambda c:
1595             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1596                             "10 queries placed some shares",
1597                             c.upload, upload.Data("data" * 10000,
1598                                                   convergence="")))
1599         # Now try with some readonly servers. We want to make sure that
1600         # the readonly peer share discovery phase is counted correctly.
1601         def _reset(ign):
1602             self.basedir = self.mktemp()
1603             self.g = None
1604         d.addCallback(_reset)
1605         d.addCallback(lambda ign:
1606             self._setup_and_upload())
1607         def _then(ign):
1608             for i in xrange(1, 11):
1609                 self._add_server(server_number=i)
1610             self._add_server(server_number=11, readonly=True)
1611             self._add_server(server_number=12, readonly=True)
1612             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1613             c = self.g.clients[0]
1614             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1615             return c
1616         d.addCallback(_then)
1617         d.addCallback(lambda c:
1618             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1619                             "2 placed none (of which 2 placed none due to "
1620                             "the server being full",
1621                             c.upload, upload.Data("data" * 10000,
1622                                                   convergence="")))
1623         # Now try the case where the upload process finds a bunch of the
1624         # shares that it wants to place on the first server, including
1625         # the one that it wanted to allocate there. Though no shares will
1626         # be allocated in this request, it should still be called
1627         # productive, since it caused some homeless shares to be
1628         # removed.
1629         d.addCallback(_reset)
1630         d.addCallback(lambda ign:
1631             self._setup_and_upload())
1632
1633         def _next(ign):
1634             for i in xrange(1, 11):
1635                 self._add_server(server_number=i)
1636             # Copy all of the shares to server 9, since that will be
1637             # the first one that the selector sees.
1638             for i in xrange(10):
1639                 self._copy_share_to_server(i, 9)
1640             # Remove server 0, and its contents
1641             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1642             # Make happiness unsatisfiable
1643             c = self.g.clients[0]
1644             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1645             return c
1646         d.addCallback(_next)
1647         d.addCallback(lambda c:
1648             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1649                             "1 queries placed some shares",
1650                             c.upload, upload.Data("data" * 10000,
1651                                                   convergence="")))
1652         return d
1653
1654
1655     def test_upper_limit_on_readonly_queries(self):
1656         self.basedir = self.mktemp()
1657         d = self._setup_and_upload()
1658         def _then(ign):
1659             for i in xrange(1, 11):
1660                 self._add_server(server_number=i, readonly=True)
1661             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1662             c = self.g.clients[0]
1663             c.DEFAULT_ENCODING_PARAMETERS['k'] = 2
1664             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1665             c.DEFAULT_ENCODING_PARAMETERS['n'] = 4
1666             return c
1667         d.addCallback(_then)
1668         d.addCallback(lambda client:
1669             self.shouldFail(UploadUnhappinessError,
1670                             "test_upper_limit_on_readonly_queries",
1671                             "sent 8 queries to 8 peers",
1672                             client.upload,
1673                             upload.Data('data' * 10000, convergence="")))
1674         return d
1675
1676
1677     def test_exception_messages_during_peer_selection(self):
1678         # server 1: read-only, no shares
1679         # server 2: read-only, no shares
1680         # server 3: read-only, no shares
1681         # server 4: read-only, no shares
1682         # server 5: read-only, no shares
1683         # This will fail, but we want to make sure that the log messages
1684         # are informative about why it has failed.
1685         self.basedir = self.mktemp()
1686         d = self._setup_and_upload()
1687         d.addCallback(lambda ign:
1688             self._add_server(server_number=1, readonly=True))
1689         d.addCallback(lambda ign:
1690             self._add_server(server_number=2, readonly=True))
1691         d.addCallback(lambda ign:
1692             self._add_server(server_number=3, readonly=True))
1693         d.addCallback(lambda ign:
1694             self._add_server(server_number=4, readonly=True))
1695         d.addCallback(lambda ign:
1696             self._add_server(server_number=5, readonly=True))
1697         d.addCallback(lambda ign:
1698             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1699         def _reset_encoding_parameters(ign, happy=4):
1700             client = self.g.clients[0]
1701             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1702             return client
1703         d.addCallback(_reset_encoding_parameters)
1704         d.addCallback(lambda client:
1705             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1706                             "placed 0 shares out of 10 "
1707                             "total (10 homeless), want to place shares on at "
1708                             "least 4 servers such that any 3 of them have "
1709                             "enough shares to recover the file, "
1710                             "sent 5 queries to 5 peers, 0 queries placed "
1711                             "some shares, 5 placed none "
1712                             "(of which 5 placed none due to the server being "
1713                             "full and 0 placed none due to an error)",
1714                             client.upload,
1715                             upload.Data("data" * 10000, convergence="")))
1716
1717
1718         # server 1: read-only, no shares
1719         # server 2: broken, no shares
1720         # server 3: read-only, no shares
1721         # server 4: read-only, no shares
1722         # server 5: read-only, no shares
1723         def _reset(ign):
1724             self.basedir = self.mktemp()
1725         d.addCallback(_reset)
1726         d.addCallback(lambda ign:
1727             self._setup_and_upload())
1728         d.addCallback(lambda ign:
1729             self._add_server(server_number=1, readonly=True))
1730         d.addCallback(lambda ign:
1731             self._add_server(server_number=2))
1732         def _break_server_2(ign):
1733             server = self.g.servers_by_number[2].my_nodeid
1734             # We have to break the server in servers_by_id,
1735             # because the one in servers_by_number isn't wrapped,
1736             # and doesn't look at its broken attribute when answering
1737             # queries.
1738             self.g.servers_by_id[server].broken = True
1739         d.addCallback(_break_server_2)
1740         d.addCallback(lambda ign:
1741             self._add_server(server_number=3, readonly=True))
1742         d.addCallback(lambda ign:
1743             self._add_server(server_number=4, readonly=True))
1744         d.addCallback(lambda ign:
1745             self._add_server(server_number=5, readonly=True))
1746         d.addCallback(lambda ign:
1747             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1748         d.addCallback(_reset_encoding_parameters)
1749         d.addCallback(lambda client:
1750             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1751                             "placed 0 shares out of 10 "
1752                             "total (10 homeless), want to place shares on at "
1753                             "least 4 servers such that any 3 of them have "
1754                             "enough shares to recover the file, "
1755                             "sent 5 queries to 5 peers, 0 queries placed "
1756                             "some shares, 5 placed none "
1757                             "(of which 4 placed none due to the server being "
1758                             "full and 1 placed none due to an error)",
1759                             client.upload,
1760                             upload.Data("data" * 10000, convergence="")))
1761         # server 0, server 1 = empty, accepting shares
1762         # This should place all of the shares, but still fail with happy=4.
1763         # We want to make sure that the exception message is worded correctly.
1764         d.addCallback(_reset)
1765         d.addCallback(lambda ign:
1766             self._setup_grid())
1767         d.addCallback(lambda ign:
1768             self._add_server(server_number=1))
1769         d.addCallback(_reset_encoding_parameters)
1770         d.addCallback(lambda client:
1771             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1772                             "shares could be placed or found on only 2 "
1773                             "server(s). We were asked to place shares on at "
1774                             "least 4 server(s) such that any 3 of them have "
1775                             "enough shares to recover the file.",
1776                             client.upload, upload.Data("data" * 10000,
1777                                                        convergence="")))
1778         # servers 0 - 4 = empty, accepting shares
1779         # This too should place all the shares, and this too should fail,
1780         # but since the effective happiness is more than the k encoding
1781         # parameter, it should trigger a different error message than the one
1782         # above.
1783         d.addCallback(_reset)
1784         d.addCallback(lambda ign:
1785             self._setup_grid())
1786         d.addCallback(lambda ign:
1787             self._add_server(server_number=1))
1788         d.addCallback(lambda ign:
1789             self._add_server(server_number=2))
1790         d.addCallback(lambda ign:
1791             self._add_server(server_number=3))
1792         d.addCallback(lambda ign:
1793             self._add_server(server_number=4))
1794         d.addCallback(_reset_encoding_parameters, happy=7)
1795         d.addCallback(lambda client:
1796             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1797                             "shares could be placed on only 5 server(s) such "
1798                             "that any 3 of them have enough shares to recover "
1799                             "the file, but we were asked to place shares on "
1800                             "at least 7 such servers.",
1801                             client.upload, upload.Data("data" * 10000,
1802                                                        convergence="")))
1803         # server 0: shares 0 - 9
1804         # server 1: share 0, read-only
1805         # server 2: share 0, read-only
1806         # server 3: share 0, read-only
1807         # This should place all of the shares, but fail with happy=4.
1808         # Since the number of servers with shares is more than the number
1809         # necessary to reconstitute the file, this will trigger a different
1810         # error message than either of those above.
1811         d.addCallback(_reset)
1812         d.addCallback(lambda ign:
1813             self._setup_and_upload())
1814         d.addCallback(lambda ign:
1815             self._add_server_with_share(server_number=1, share_number=0,
1816                                         readonly=True))
1817         d.addCallback(lambda ign:
1818             self._add_server_with_share(server_number=2, share_number=0,
1819                                         readonly=True))
1820         d.addCallback(lambda ign:
1821             self._add_server_with_share(server_number=3, share_number=0,
1822                                         readonly=True))
1823         d.addCallback(_reset_encoding_parameters, happy=7)
1824         d.addCallback(lambda client:
1825             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1826                             "shares could be placed or found on 4 server(s), "
1827                             "but they are not spread out evenly enough to "
1828                             "ensure that any 3 of these servers would have "
1829                             "enough shares to recover the file. We were asked "
1830                             "to place shares on at least 7 servers such that "
1831                             "any 3 of them have enough shares to recover the "
1832                             "file",
1833                             client.upload, upload.Data("data" * 10000,
1834                                                        convergence="")))
1835         return d
1836
1837
1838     def test_problem_layout_comment_187(self):
1839         # #778 comment 187 broke an initial attempt at a share
1840         # redistribution algorithm. This test is here to demonstrate the
1841         # breakage, and to test that subsequent algorithms don't also
1842         # break in the same way.
1843         self.basedir = self.mktemp()
1844         d = self._setup_and_upload(k=2, n=3)
1845
1846         # server 1: shares 0, 1, 2, readonly
1847         # server 2: share 0, readonly
1848         # server 3: share 0
1849         def _setup(ign):
1850             self._add_server_with_share(server_number=1, share_number=0,
1851                                         readonly=True)
1852             self._add_server_with_share(server_number=2, share_number=0,
1853                                         readonly=True)
1854             self._add_server_with_share(server_number=3, share_number=0)
1855             # Copy shares
1856             self._copy_share_to_server(1, 1)
1857             self._copy_share_to_server(2, 1)
1858             # Remove server 0
1859             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1860             client = self.g.clients[0]
1861             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
1862             return client
1863
1864         d.addCallback(_setup)
1865         d.addCallback(lambda client:
1866             client.upload(upload.Data("data" * 10000, convergence="")))
1867         d.addCallback(lambda ign:
1868             self.failUnless(self._has_happy_share_distribution()))
1869         return d
1870     test_problem_layout_comment_187.todo = "this isn't fixed yet"
1871
1872     def test_problem_layout_ticket_1118(self):
1873         # #1118 includes a report from a user who hit an assertion in
1874         # the upload code with this layout.
1875         self.basedir = self.mktemp()
1876         d = self._setup_and_upload(k=2, n=4)
1877
1878         # server 0: no shares
1879         # server 1: shares 0, 3
1880         # server 3: share 1
1881         # server 2: share 2
1882         # The order that they get queries is 0, 1, 3, 2
1883         def _setup(ign):
1884             self._add_server(server_number=0)
1885             self._add_server_with_share(server_number=1, share_number=0)
1886             self._add_server_with_share(server_number=2, share_number=2)
1887             self._add_server_with_share(server_number=3, share_number=1)
1888             # Copy shares
1889             self._copy_share_to_server(3, 1)
1890             storedir = self.get_serverdir(0)
1891             # remove the storedir, wiping out any existing shares
1892             shutil.rmtree(storedir)
1893             # create an empty storedir to replace the one we just removed
1894             os.mkdir(storedir)
1895             client = self.g.clients[0]
1896             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1897             return client
1898
1899         d.addCallback(_setup)
1900         # Note: actually it should succeed! See
1901         # test_problem_layout_ticket_1128. But ticket 1118 is just to
1902         # make it realize that it has failed, so if it raises
1903         # UploadUnhappinessError then we'll give it the green light
1904         # for now.
1905         d.addCallback(lambda ignored:
1906             self.shouldFail(UploadUnhappinessError,
1907                             "test_problem_layout_ticket_1118",
1908                             "",
1909                             self.g.clients[0].upload, upload.Data("data" * 10000,
1910                                                        convergence="")))
1911         return d
1912
1913     def test_problem_layout_ticket_1128(self):
1914         # #1118 includes a report from a user who hit an assertion in
1915         # the upload code with this layout.
1916         self.basedir = self.mktemp()
1917         d = self._setup_and_upload(k=2, n=4)
1918
1919         # server 0: no shares
1920         # server 1: shares 0, 3
1921         # server 3: share 1
1922         # server 2: share 2
1923         # The order that they get queries is 0, 1, 3, 2
1924         def _setup(ign):
1925             self._add_server(server_number=0)
1926             self._add_server_with_share(server_number=1, share_number=0)
1927             self._add_server_with_share(server_number=2, share_number=2)
1928             self._add_server_with_share(server_number=3, share_number=1)
1929             # Copy shares
1930             self._copy_share_to_server(3, 1)
1931             storedir = self.get_serverdir(0)
1932             # remove the storedir, wiping out any existing shares
1933             shutil.rmtree(storedir)
1934             # create an empty storedir to replace the one we just removed
1935             os.mkdir(storedir)
1936             client = self.g.clients[0]
1937             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1938             return client
1939
1940         d.addCallback(_setup)
1941         d.addCallback(lambda client:
1942                           client.upload(upload.Data("data" * 10000, convergence="")))
1943         d.addCallback(lambda ign:
1944             self.failUnless(self._has_happy_share_distribution()))
1945         return d
1946     test_problem_layout_ticket_1128.todo = "Invent a smarter uploader that uploads successfully in this case."
1947
1948     def test_upload_succeeds_with_some_homeless_shares(self):
1949         # If the upload is forced to stop trying to place shares before
1950         # it has placed (or otherwise accounted) for all of them, but it
1951         # has placed enough to satisfy the upload health criteria that
1952         # we're using, it should still succeed.
1953         self.basedir = self.mktemp()
1954         d = self._setup_and_upload()
1955         def _server_setup(ign):
1956             # Add four servers so that we have a layout like this:
1957             # server 1: share 0, read-only
1958             # server 2: share 1, read-only
1959             # server 3: share 2, read-only
1960             # server 4: share 3, read-only
1961             # If we set happy = 4, the upload will manage to satisfy
1962             # servers of happiness, but not place all of the shares; we
1963             # want to test that the upload is declared successful in
1964             # this case.
1965             self._add_server_with_share(server_number=1, share_number=0,
1966                                         readonly=True)
1967             self._add_server_with_share(server_number=2, share_number=1,
1968                                         readonly=True)
1969             self._add_server_with_share(server_number=3, share_number=2,
1970                                         readonly=True)
1971             self._add_server_with_share(server_number=4, share_number=3,
1972                                         readonly=True)
1973             # Remove server 0.
1974             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1975             # Set the client appropriately
1976             c = self.g.clients[0]
1977             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1978             return c
1979         d.addCallback(_server_setup)
1980         d.addCallback(lambda client:
1981             client.upload(upload.Data("data" * 10000, convergence="")))
1982         d.addCallback(lambda ign:
1983             self.failUnless(self._has_happy_share_distribution()))
1984         return d
1985
1986
1987     def test_uploader_skips_over_servers_with_only_one_share(self):
1988         # We want to make sure that the redistribution logic ignores
1989         # servers with only one share, since placing these shares
1990         # elsewhere will at best keep happiness the same as it was, and
1991         # at worst hurt it.
1992         self.basedir = self.mktemp()
1993         d = self._setup_and_upload()
1994         def _server_setup(ign):
1995             # Add some servers so that the upload will need to
1996             # redistribute, but will first pass over a couple of servers
1997             # that don't have enough shares to redistribute before
1998             # finding one that does have shares to redistribute.
1999             self._add_server_with_share(server_number=1, share_number=0)
2000             self._add_server_with_share(server_number=2, share_number=2)
2001             self._add_server_with_share(server_number=3, share_number=1)
2002             self._add_server_with_share(server_number=8, share_number=4)
2003             self._add_server_with_share(server_number=5, share_number=5)
2004             self._add_server_with_share(server_number=10, share_number=7)
2005             for i in xrange(4):
2006                 self._copy_share_to_server(i, 2)
2007             return self.g.clients[0]
2008         d.addCallback(_server_setup)
2009         d.addCallback(lambda client:
2010             client.upload(upload.Data("data" * 10000, convergence="")))
2011         d.addCallback(lambda ign:
2012             self.failUnless(self._has_happy_share_distribution()))
2013         return d
2014
2015
2016     def test_peer_selector_bucket_abort(self):
2017         # If peer selection for an upload fails due to an unhappy
2018         # layout, the peer selection process should abort the buckets it
2019         # allocates before failing, so that the space can be re-used.
2020         self.basedir = self.mktemp()
2021         self.set_up_grid(num_servers=5)
2022
2023         # Try to upload a file with happy=7, which is unsatisfiable with
2024         # the current grid. This will fail, but should not take up any
2025         # space on the storage servers after it fails.
2026         client = self.g.clients[0]
2027         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2028         d = defer.succeed(None)
2029         d.addCallback(lambda ignored:
2030             self.shouldFail(UploadUnhappinessError,
2031                             "test_peer_selection_bucket_abort",
2032                             "",
2033                             client.upload, upload.Data("data" * 10000,
2034                                                        convergence="")))
2035         # wait for the abort messages to get there.
2036         def _turn_barrier(res):
2037             return fireEventually(res)
2038         d.addCallback(_turn_barrier)
2039         def _then(ignored):
2040             for server in self.g.servers_by_number.values():
2041                 self.failUnlessEqual(server.allocated_size(), 0)
2042         d.addCallback(_then)
2043         return d
2044
2045
2046     def test_encoder_bucket_abort(self):
2047         # If enough servers die in the process of encoding and uploading
2048         # a file to make the layout unhappy, we should cancel the
2049         # newly-allocated buckets before dying.
2050         self.basedir = self.mktemp()
2051         self.set_up_grid(num_servers=4)
2052
2053         client = self.g.clients[0]
2054         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2055
2056         d = defer.succeed(None)
2057         d.addCallback(lambda ignored:
2058             self.shouldFail(UploadUnhappinessError,
2059                             "test_encoder_bucket_abort",
2060                             "",
2061                             self._do_upload_with_broken_servers, 1))
2062         def _turn_barrier(res):
2063             return fireEventually(res)
2064         d.addCallback(_turn_barrier)
2065         def _then(ignored):
2066             for server in self.g.servers_by_number.values():
2067                 self.failUnlessEqual(server.allocated_size(), 0)
2068         d.addCallback(_then)
2069         return d
2070
2071
2072     def _set_up_nodes_extra_config(self, clientdir):
2073         cfgfn = os.path.join(clientdir, "tahoe.cfg")
2074         oldcfg = open(cfgfn, "r").read()
2075         f = open(cfgfn, "wt")
2076         f.write(oldcfg)
2077         f.write("\n")
2078         f.write("[client]\n")
2079         f.write("shares.needed = 7\n")
2080         f.write("shares.total = 12\n")
2081         f.write("\n")
2082         f.close()
2083         return None
2084
2085 # TODO:
2086 #  upload with exactly 75 peers (shares_of_happiness)
2087 #  have a download fail
2088 #  cancel a download (need to implement more cancel stuff)
2089
2090 # from test_encode:
2091 # NoNetworkGrid, upload part of ciphertext, kill server, continue upload
2092 # check with Kevan, they want to live in test_upload, existing tests might cover
2093 #     def test_lost_one_shareholder(self): # these are upload-side tests
2094 #     def test_lost_one_shareholder_early(self):
2095 #     def test_lost_many_shareholders(self):
2096 #     def test_lost_all_shareholders(self):