]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
immutable: use PrefixingLogMixin to organize logging in Tahoe2PeerSelector and add...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1 # -*- coding: utf-8 -*-
2
3 import os, shutil
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.python.failure import Failure
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
9
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
14 from allmydata.util import log
15 from allmydata.util.assertutil import precondition
16 from allmydata.util.deferredutil import DeferredListShouldSucceed
17 from allmydata.test.no_network import GridTestMixin
18 from allmydata.test.common_util import ShouldFailMixin
19 from allmydata.util.happinessutil import servers_of_happiness, \
20                                          shares_by_server, merge_peers
21 from allmydata.storage_client import StorageFarmBroker
22 from allmydata.storage.server import storage_index_to_dir
23
24 MiB = 1024*1024
25
26 def extract_uri(results):
27     return results.uri
28
29 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
30 # have been due to an earlier test ERROR'ing out due to timeout, which seems
31 # to screw up subsequent tests.
32 timeout = 960
33
34 class Uploadable(unittest.TestCase):
35     def shouldEqual(self, data, expected):
36         self.failUnless(isinstance(data, list))
37         for e in data:
38             self.failUnless(isinstance(e, str))
39         s = "".join(data)
40         self.failUnlessEqual(s, expected)
41
42     def test_filehandle_random_key(self):
43         return self._test_filehandle(convergence=None)
44
45     def test_filehandle_convergent_encryption(self):
46         return self._test_filehandle(convergence="some convergence string")
47
48     def _test_filehandle(self, convergence):
49         s = StringIO("a"*41)
50         u = upload.FileHandle(s, convergence=convergence)
51         d = u.get_size()
52         d.addCallback(self.failUnlessEqual, 41)
53         d.addCallback(lambda res: u.read(1))
54         d.addCallback(self.shouldEqual, "a")
55         d.addCallback(lambda res: u.read(80))
56         d.addCallback(self.shouldEqual, "a"*40)
57         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
58         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
59         return d
60
61     def test_filename(self):
62         basedir = "upload/Uploadable/test_filename"
63         os.makedirs(basedir)
64         fn = os.path.join(basedir, "file")
65         f = open(fn, "w")
66         f.write("a"*41)
67         f.close()
68         u = upload.FileName(fn, convergence=None)
69         d = u.get_size()
70         d.addCallback(self.failUnlessEqual, 41)
71         d.addCallback(lambda res: u.read(1))
72         d.addCallback(self.shouldEqual, "a")
73         d.addCallback(lambda res: u.read(80))
74         d.addCallback(self.shouldEqual, "a"*40)
75         d.addCallback(lambda res: u.close())
76         return d
77
78     def test_data(self):
79         s = "a"*41
80         u = upload.Data(s, convergence=None)
81         d = u.get_size()
82         d.addCallback(self.failUnlessEqual, 41)
83         d.addCallback(lambda res: u.read(1))
84         d.addCallback(self.shouldEqual, "a")
85         d.addCallback(lambda res: u.read(80))
86         d.addCallback(self.shouldEqual, "a"*40)
87         d.addCallback(lambda res: u.close())
88         return d
89
90 class ServerError(Exception):
91     pass
92
93 class SetDEPMixin:
94     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
95         p = {"k": k,
96              "happy": happy,
97              "n": n,
98              "max_segment_size": max_segsize,
99              }
100         self.node.DEFAULT_ENCODING_PARAMETERS = p
101
102 class FakeStorageServer:
103     def __init__(self, mode):
104         self.mode = mode
105         self.allocated = []
106         self.queries = 0
107         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
108                          { "maximum-immutable-share-size": 2**32 },
109                          "application-version": str(allmydata.__full_version__),
110                          }
111         if mode == "small":
112             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
113                              { "maximum-immutable-share-size": 10 },
114                              "application-version": str(allmydata.__full_version__),
115                              }
116
117
118     def callRemote(self, methname, *args, **kwargs):
119         def _call():
120             meth = getattr(self, methname)
121             return meth(*args, **kwargs)
122         d = fireEventually()
123         d.addCallback(lambda res: _call())
124         return d
125
126     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
127                          sharenums, share_size, canary):
128         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
129         if self.mode == "first-fail":
130             if self.queries == 0:
131                 raise ServerError
132         if self.mode == "second-fail":
133             if self.queries == 1:
134                 raise ServerError
135         self.queries += 1
136         if self.mode == "full":
137             return (set(), {},)
138         elif self.mode == "already got them":
139             return (set(sharenums), {},)
140         else:
141             for shnum in sharenums:
142                 self.allocated.append( (storage_index, shnum) )
143             return (set(),
144                     dict([( shnum, FakeBucketWriter(share_size) )
145                           for shnum in sharenums]),
146                     )
147
148 class FakeBucketWriter:
149     # a diagnostic version of storageserver.BucketWriter
150     def __init__(self, size):
151         self.data = StringIO()
152         self.closed = False
153         self._size = size
154
155     def callRemote(self, methname, *args, **kwargs):
156         def _call():
157             meth = getattr(self, "remote_" + methname)
158             return meth(*args, **kwargs)
159         d = fireEventually()
160         d.addCallback(lambda res: _call())
161         return d
162
163
164     def callRemoteOnly(self, methname, *args, **kwargs):
165         d = self.callRemote(methname, *args, **kwargs)
166         del d # callRemoteOnly ignores this
167         return None
168
169
170     def remote_write(self, offset, data):
171         precondition(not self.closed)
172         precondition(offset >= 0)
173         precondition(offset+len(data) <= self._size,
174                      "offset=%d + data=%d > size=%d" %
175                      (offset, len(data), self._size))
176         self.data.seek(offset)
177         self.data.write(data)
178
179     def remote_close(self):
180         precondition(not self.closed)
181         self.closed = True
182
183     def remote_abort(self):
184         pass
185
186 class FakeClient:
187     DEFAULT_ENCODING_PARAMETERS = {"k":25,
188                                    "happy": 25,
189                                    "n": 100,
190                                    "max_segment_size": 1*MiB,
191                                    }
192     def __init__(self, mode="good", num_servers=50):
193         self.num_servers = num_servers
194         if type(mode) is str:
195             mode = dict([i,mode] for i in range(num_servers))
196         peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
197                   for fakeid in range(self.num_servers) ]
198         self.storage_broker = StorageFarmBroker(None, permute_peers=True)
199         for (serverid, server) in peers:
200             self.storage_broker.test_add_server(serverid, server)
201         self.last_peers = [p[1] for p in peers]
202
203     def log(self, *args, **kwargs):
204         pass
205     def get_encoding_parameters(self):
206         return self.DEFAULT_ENCODING_PARAMETERS
207     def get_storage_broker(self):
208         return self.storage_broker
209     _secret_holder = client.SecretHolder("lease secret", "convergence secret")
210
211 class GotTooFarError(Exception):
212     pass
213
214 class GiganticUploadable(upload.FileHandle):
215     def __init__(self, size):
216         self._size = size
217         self._fp = 0
218
219     def get_encryption_key(self):
220         return defer.succeed("\x00" * 16)
221     def get_size(self):
222         return defer.succeed(self._size)
223     def read(self, length):
224         left = self._size - self._fp
225         length = min(left, length)
226         self._fp += length
227         if self._fp > 1000000:
228             # terminate the test early.
229             raise GotTooFarError("we shouldn't be allowed to get this far")
230         return defer.succeed(["\x00" * length])
231     def close(self):
232         pass
233
234 DATA = """
235 Once upon a time, there was a beautiful princess named Buttercup. She lived
236 in a magical land where every file was stored securely among millions of
237 machines, and nobody ever worried about their data being lost ever again.
238 The End.
239 """
240 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
241
242 SIZE_ZERO = 0
243 SIZE_SMALL = 16
244 SIZE_LARGE = len(DATA)
245
246 def upload_data(uploader, data):
247     u = upload.Data(data, convergence=None)
248     return uploader.upload(u)
249 def upload_filename(uploader, filename):
250     u = upload.FileName(filename, convergence=None)
251     return uploader.upload(u)
252 def upload_filehandle(uploader, fh):
253     u = upload.FileHandle(fh, convergence=None)
254     return uploader.upload(u)
255
256 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
257     def setUp(self):
258         self.node = FakeClient(mode="good")
259         self.u = upload.Uploader()
260         self.u.running = True
261         self.u.parent = self.node
262
263     def _check_small(self, newuri, size):
264         u = uri.from_string(newuri)
265         self.failUnless(isinstance(u, uri.LiteralFileURI))
266         self.failUnlessEqual(len(u.data), size)
267
268     def _check_large(self, newuri, size):
269         u = uri.from_string(newuri)
270         self.failUnless(isinstance(u, uri.CHKFileURI))
271         self.failUnless(isinstance(u.get_storage_index(), str))
272         self.failUnlessEqual(len(u.get_storage_index()), 16)
273         self.failUnless(isinstance(u.key, str))
274         self.failUnlessEqual(len(u.key), 16)
275         self.failUnlessEqual(u.size, size)
276
277     def get_data(self, size):
278         return DATA[:size]
279
280     def test_too_large(self):
281         # we've removed the 4GiB share size limit (see ticket #346 for
282         # details), but still have an 8-byte field, so the limit is now
283         # 2**64, so make sure we reject files larger than that.
284         k = 3; happy = 7; n = 10
285         self.set_encoding_parameters(k, happy, n)
286         big = k*(2**64)
287         data1 = GiganticUploadable(big)
288         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
289                             "This file is too large to be uploaded (data_size)",
290                             self.u.upload, data1)
291         data2 = GiganticUploadable(big-3)
292         d.addCallback(lambda res:
293                       self.shouldFail(FileTooLargeError,
294                                       "test_too_large-data2",
295                                       "This file is too large to be uploaded (offsets)",
296                                       self.u.upload, data2))
297         # I don't know where the actual limit is.. it depends upon how large
298         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
299         return d
300
301     def test_data_zero(self):
302         data = self.get_data(SIZE_ZERO)
303         d = upload_data(self.u, data)
304         d.addCallback(extract_uri)
305         d.addCallback(self._check_small, SIZE_ZERO)
306         return d
307
308     def test_data_small(self):
309         data = self.get_data(SIZE_SMALL)
310         d = upload_data(self.u, data)
311         d.addCallback(extract_uri)
312         d.addCallback(self._check_small, SIZE_SMALL)
313         return d
314
315     def test_data_large(self):
316         data = self.get_data(SIZE_LARGE)
317         d = upload_data(self.u, data)
318         d.addCallback(extract_uri)
319         d.addCallback(self._check_large, SIZE_LARGE)
320         return d
321
322     def test_data_large_odd_segments(self):
323         data = self.get_data(SIZE_LARGE)
324         segsize = int(SIZE_LARGE / 2.5)
325         # we want 3 segments, since that's not a power of two
326         self.set_encoding_parameters(25, 25, 100, segsize)
327         d = upload_data(self.u, data)
328         d.addCallback(extract_uri)
329         d.addCallback(self._check_large, SIZE_LARGE)
330         return d
331
332     def test_filehandle_zero(self):
333         data = self.get_data(SIZE_ZERO)
334         d = upload_filehandle(self.u, StringIO(data))
335         d.addCallback(extract_uri)
336         d.addCallback(self._check_small, SIZE_ZERO)
337         return d
338
339     def test_filehandle_small(self):
340         data = self.get_data(SIZE_SMALL)
341         d = upload_filehandle(self.u, StringIO(data))
342         d.addCallback(extract_uri)
343         d.addCallback(self._check_small, SIZE_SMALL)
344         return d
345
346     def test_filehandle_large(self):
347         data = self.get_data(SIZE_LARGE)
348         d = upload_filehandle(self.u, StringIO(data))
349         d.addCallback(extract_uri)
350         d.addCallback(self._check_large, SIZE_LARGE)
351         return d
352
353     def test_filename_zero(self):
354         fn = "Uploader-test_filename_zero.data"
355         f = open(fn, "wb")
356         data = self.get_data(SIZE_ZERO)
357         f.write(data)
358         f.close()
359         d = upload_filename(self.u, fn)
360         d.addCallback(extract_uri)
361         d.addCallback(self._check_small, SIZE_ZERO)
362         return d
363
364     def test_filename_small(self):
365         fn = "Uploader-test_filename_small.data"
366         f = open(fn, "wb")
367         data = self.get_data(SIZE_SMALL)
368         f.write(data)
369         f.close()
370         d = upload_filename(self.u, fn)
371         d.addCallback(extract_uri)
372         d.addCallback(self._check_small, SIZE_SMALL)
373         return d
374
375     def test_filename_large(self):
376         fn = "Uploader-test_filename_large.data"
377         f = open(fn, "wb")
378         data = self.get_data(SIZE_LARGE)
379         f.write(data)
380         f.close()
381         d = upload_filename(self.u, fn)
382         d.addCallback(extract_uri)
383         d.addCallback(self._check_large, SIZE_LARGE)
384         return d
385
386 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
387     def make_node(self, mode, num_servers=10):
388         self.node = FakeClient(mode, num_servers)
389         self.u = upload.Uploader()
390         self.u.running = True
391         self.u.parent = self.node
392
393     def _check_large(self, newuri, size):
394         u = uri.from_string(newuri)
395         self.failUnless(isinstance(u, uri.CHKFileURI))
396         self.failUnless(isinstance(u.get_storage_index(), str))
397         self.failUnlessEqual(len(u.get_storage_index()), 16)
398         self.failUnless(isinstance(u.key, str))
399         self.failUnlessEqual(len(u.key), 16)
400         self.failUnlessEqual(u.size, size)
401
402     def test_first_error(self):
403         mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
404         self.make_node(mode)
405         self.set_encoding_parameters(k=25, happy=1, n=50)
406         d = upload_data(self.u, DATA)
407         d.addCallback(extract_uri)
408         d.addCallback(self._check_large, SIZE_LARGE)
409         return d
410
411     def test_first_error_all(self):
412         self.make_node("first-fail")
413         d = self.shouldFail(UploadUnhappinessError, "first_error_all",
414                             "peer selection failed",
415                             upload_data, self.u, DATA)
416         def _check((f,)):
417             self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
418             # there should also be a 'last failure was' message
419             self.failUnlessIn("ServerError", str(f.value))
420         d.addCallback(_check)
421         return d
422
423     def test_second_error(self):
424         # we want to make sure we make it to a third pass. This means that
425         # the first pass was insufficient to place all shares, and at least
426         # one of second pass servers (other than the last one) accepted a
427         # share (so we'll believe that a third pass will be useful). (if
428         # everyone but the last server throws an error, then we'll send all
429         # the remaining shares to the last server at the end of the second
430         # pass, and if that succeeds, we won't make it to a third pass).
431         #
432         # we can achieve this 97.5% of the time by using 40 servers, having
433         # 39 of them fail on the second request, leaving only one to succeed
434         # on the second request. (we need to keep the number of servers low
435         # enough to ensure a second pass with 100 shares).
436         mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
437         self.make_node(mode, 40)
438         d = upload_data(self.u, DATA)
439         d.addCallback(extract_uri)
440         d.addCallback(self._check_large, SIZE_LARGE)
441         return d
442
443     def test_second_error_all(self):
444         self.make_node("second-fail")
445         d = self.shouldFail(UploadUnhappinessError, "second_error_all",
446                             "peer selection failed",
447                             upload_data, self.u, DATA)
448         def _check((f,)):
449             self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
450             # there should also be a 'last failure was' message
451             self.failUnlessIn("ServerError", str(f.value))
452         d.addCallback(_check)
453         return d
454
455 class FullServer(unittest.TestCase):
456     def setUp(self):
457         self.node = FakeClient(mode="full")
458         self.u = upload.Uploader()
459         self.u.running = True
460         self.u.parent = self.node
461
462     def _should_fail(self, f):
463         self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
464
465     def test_data_large(self):
466         data = DATA
467         d = upload_data(self.u, data)
468         d.addBoth(self._should_fail)
469         return d
470
471 class PeerSelection(unittest.TestCase):
472
473     def make_client(self, num_servers=50):
474         self.node = FakeClient(mode="good", num_servers=num_servers)
475         self.u = upload.Uploader()
476         self.u.running = True
477         self.u.parent = self.node
478
479     def get_data(self, size):
480         return DATA[:size]
481
482     def _check_large(self, newuri, size):
483         u = uri.from_string(newuri)
484         self.failUnless(isinstance(u, uri.CHKFileURI))
485         self.failUnless(isinstance(u.get_storage_index(), str))
486         self.failUnlessEqual(len(u.get_storage_index()), 16)
487         self.failUnless(isinstance(u.key, str))
488         self.failUnlessEqual(len(u.key), 16)
489         self.failUnlessEqual(u.size, size)
490
491     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
492         p = {"k": k,
493              "happy": happy,
494              "n": n,
495              "max_segment_size": max_segsize,
496              }
497         self.node.DEFAULT_ENCODING_PARAMETERS = p
498
499     def test_one_each(self):
500         # if we have 50 shares, and there are 50 peers, and they all accept a
501         # share, we should get exactly one share per peer
502
503         self.make_client()
504         data = self.get_data(SIZE_LARGE)
505         self.set_encoding_parameters(25, 30, 50)
506         d = upload_data(self.u, data)
507         d.addCallback(extract_uri)
508         d.addCallback(self._check_large, SIZE_LARGE)
509         def _check(res):
510             for p in self.node.last_peers:
511                 allocated = p.allocated
512                 self.failUnlessEqual(len(allocated), 1)
513                 self.failUnlessEqual(p.queries, 1)
514         d.addCallback(_check)
515         return d
516
517     def test_two_each(self):
518         # if we have 100 shares, and there are 50 peers, and they all accept
519         # all shares, we should get exactly two shares per peer
520
521         self.make_client()
522         data = self.get_data(SIZE_LARGE)
523         # if there are 50 peers, then happy needs to be <= 50
524         self.set_encoding_parameters(50, 50, 100)
525         d = upload_data(self.u, data)
526         d.addCallback(extract_uri)
527         d.addCallback(self._check_large, SIZE_LARGE)
528         def _check(res):
529             for p in self.node.last_peers:
530                 allocated = p.allocated
531                 self.failUnlessEqual(len(allocated), 2)
532                 self.failUnlessEqual(p.queries, 2)
533         d.addCallback(_check)
534         return d
535
536     def test_one_each_plus_one_extra(self):
537         # if we have 51 shares, and there are 50 peers, then one peer gets
538         # two shares and the rest get just one
539
540         self.make_client()
541         data = self.get_data(SIZE_LARGE)
542         self.set_encoding_parameters(24, 41, 51)
543         d = upload_data(self.u, data)
544         d.addCallback(extract_uri)
545         d.addCallback(self._check_large, SIZE_LARGE)
546         def _check(res):
547             got_one = []
548             got_two = []
549             for p in self.node.last_peers:
550                 allocated = p.allocated
551                 self.failUnless(len(allocated) in (1,2), len(allocated))
552                 if len(allocated) == 1:
553                     self.failUnlessEqual(p.queries, 1)
554                     got_one.append(p)
555                 else:
556                     self.failUnlessEqual(p.queries, 2)
557                     got_two.append(p)
558             self.failUnlessEqual(len(got_one), 49)
559             self.failUnlessEqual(len(got_two), 1)
560         d.addCallback(_check)
561         return d
562
563     def test_four_each(self):
564         # if we have 200 shares, and there are 50 peers, then each peer gets
565         # 4 shares. The design goal is to accomplish this with only two
566         # queries per peer.
567
568         self.make_client()
569         data = self.get_data(SIZE_LARGE)
570         # if there are 50 peers, then happy should be no more than 50 if
571         # we want this to work.
572         self.set_encoding_parameters(100, 50, 200)
573         d = upload_data(self.u, data)
574         d.addCallback(extract_uri)
575         d.addCallback(self._check_large, SIZE_LARGE)
576         def _check(res):
577             for p in self.node.last_peers:
578                 allocated = p.allocated
579                 self.failUnlessEqual(len(allocated), 4)
580                 self.failUnlessEqual(p.queries, 2)
581         d.addCallback(_check)
582         return d
583
584     def test_three_of_ten(self):
585         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
586         # 4+4+2
587
588         self.make_client(3)
589         data = self.get_data(SIZE_LARGE)
590         self.set_encoding_parameters(3, 3, 10)
591         d = upload_data(self.u, data)
592         d.addCallback(extract_uri)
593         d.addCallback(self._check_large, SIZE_LARGE)
594         def _check(res):
595             counts = {}
596             for p in self.node.last_peers:
597                 allocated = p.allocated
598                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
599             histogram = [counts.get(i, 0) for i in range(5)]
600             self.failUnlessEqual(histogram, [0,0,0,2,1])
601         d.addCallback(_check)
602         return d
603
604     def test_some_big_some_small(self):
605         # 10 shares, 20 servers, but half the servers don't support a
606         # share-size large enough for our file
607         mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
608         self.node = FakeClient(mode, num_servers=20)
609         self.u = upload.Uploader()
610         self.u.running = True
611         self.u.parent = self.node
612
613         data = self.get_data(SIZE_LARGE)
614         self.set_encoding_parameters(3, 5, 10)
615         d = upload_data(self.u, data)
616         d.addCallback(extract_uri)
617         d.addCallback(self._check_large, SIZE_LARGE)
618         def _check(res):
619             # we should have put one share each on the big peers, and zero
620             # shares on the small peers
621             total_allocated = 0
622             for p in self.node.last_peers:
623                 if p.mode == "good":
624                     self.failUnlessEqual(len(p.allocated), 1)
625                 elif p.mode == "small":
626                     self.failUnlessEqual(len(p.allocated), 0)
627                 total_allocated += len(p.allocated)
628             self.failUnlessEqual(total_allocated, 10)
629         d.addCallback(_check)
630         return d
631
632
633 class StorageIndex(unittest.TestCase):
634     def test_params_must_matter(self):
635         DATA = "I am some data"
636         u = upload.Data(DATA, convergence="")
637         eu = upload.EncryptAnUploadable(u)
638         d1 = eu.get_storage_index()
639
640         # CHK means the same data should encrypt the same way
641         u = upload.Data(DATA, convergence="")
642         eu = upload.EncryptAnUploadable(u)
643         d1a = eu.get_storage_index()
644
645         # but if we use a different convergence string it should be different
646         u = upload.Data(DATA, convergence="wheee!")
647         eu = upload.EncryptAnUploadable(u)
648         d1salt1 = eu.get_storage_index()
649
650         # and if we add yet a different convergence it should be different again
651         u = upload.Data(DATA, convergence="NOT wheee!")
652         eu = upload.EncryptAnUploadable(u)
653         d1salt2 = eu.get_storage_index()
654
655         # and if we use the first string again it should be the same as last time
656         u = upload.Data(DATA, convergence="wheee!")
657         eu = upload.EncryptAnUploadable(u)
658         d1salt1a = eu.get_storage_index()
659
660         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
661         u = upload.Data(DATA, convergence="")
662         u.encoding_param_k = u.default_encoding_param_k + 1
663         eu = upload.EncryptAnUploadable(u)
664         d2 = eu.get_storage_index()
665
666         # and if we use a random key, it should be different than the CHK
667         u = upload.Data(DATA, convergence=None)
668         eu = upload.EncryptAnUploadable(u)
669         d3 = eu.get_storage_index()
670         # and different from another instance
671         u = upload.Data(DATA, convergence=None)
672         eu = upload.EncryptAnUploadable(u)
673         d4 = eu.get_storage_index()
674
675         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
676         def _done(res):
677             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
678             self.failUnlessEqual(si1, si1a)
679             self.failIfEqual(si1, si2)
680             self.failIfEqual(si1, si3)
681             self.failIfEqual(si1, si4)
682             self.failIfEqual(si3, si4)
683             self.failIfEqual(si1salt1, si1)
684             self.failIfEqual(si1salt1, si1salt2)
685             self.failIfEqual(si1salt2, si1)
686             self.failUnlessEqual(si1salt1, si1salt1a)
687         d.addCallback(_done)
688         return d
689
690 # copied from python docs because itertools.combinations was added in
691 # python 2.6 and we support >= 2.4.
692 def combinations(iterable, r):
693     # combinations('ABCD', 2) --> AB AC AD BC BD CD
694     # combinations(range(4), 3) --> 012 013 023 123
695     pool = tuple(iterable)
696     n = len(pool)
697     if r > n:
698         return
699     indices = range(r)
700     yield tuple(pool[i] for i in indices)
701     while True:
702         for i in reversed(range(r)):
703             if indices[i] != i + n - r:
704                 break
705         else:
706             return
707         indices[i] += 1
708         for j in range(i+1, r):
709             indices[j] = indices[j-1] + 1
710         yield tuple(pool[i] for i in indices)
711
712 def is_happy_enough(servertoshnums, h, k):
713     """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """
714     print "servertoshnums: ", servertoshnums, "h: ", h, "k: ", k
715     if len(servertoshnums) < h:
716         return False
717     # print "servertoshnums: ", servertoshnums, h, k
718     for happysetcombo in combinations(servertoshnums.iterkeys(), h):
719         # print "happysetcombo: ", happysetcombo
720         for subsetcombo in combinations(happysetcombo, k):
721             shnums = reduce(set.union, [ servertoshnums[s] for s in subsetcombo ])
722             # print "subsetcombo: ", subsetcombo, ", shnums: ", shnums
723             if len(shnums) < k:
724                 # print "NOT HAAPP{Y", shnums, k
725                 return False
726     # print "HAAPP{Y"
727     return True
728
729 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
730     ShouldFailMixin):
731     def find_all_shares(self, unused=None):
732         """Locate shares on disk. Returns a dict that maps
733         server to set of sharenums.
734         """
735         assert self.g, "I tried to find a grid at self.g, but failed"
736         servertoshnums = {} # k: server, v: set(shnum)
737
738         for i, c in self.g.servers_by_number.iteritems():
739             for (dirp, dirns, fns) in os.walk(c.sharedir):
740                 for fn in fns:
741                     try:
742                         sharenum = int(fn)
743                     except TypeError:
744                         # Whoops, I guess that's not a share file then.
745                         pass
746                     else:
747                         servertoshnums.setdefault(i, set()).add(sharenum)
748
749         return servertoshnums
750
751     def _do_upload_with_broken_servers(self, servers_to_break):
752         """
753         I act like a normal upload, but before I send the results of
754         Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
755         PeerTrackers in the upload_servers part of the return result.
756         """
757         assert self.g, "I tried to find a grid at self.g, but failed"
758         broker = self.g.clients[0].storage_broker
759         sh     = self.g.clients[0]._secret_holder
760         data = upload.Data("data" * 10000, convergence="")
761         data.encoding_param_k = 3
762         data.encoding_param_happy = 4
763         data.encoding_param_n = 10
764         uploadable = upload.EncryptAnUploadable(data)
765         encoder = encode.Encoder()
766         encoder.set_encrypted_uploadable(uploadable)
767         status = upload.UploadStatus()
768         selector = upload.Tahoe2PeerSelector("dglev", "test", status)
769         storage_index = encoder.get_param("storage_index")
770         share_size = encoder.get_param("share_size")
771         block_size = encoder.get_param("block_size")
772         num_segments = encoder.get_param("num_segments")
773         d = selector.get_shareholders(broker, sh, storage_index,
774                                       share_size, block_size, num_segments,
775                                       10, 3, 4)
776         def _have_shareholders((upload_servers, already_peers)):
777             assert servers_to_break <= len(upload_servers)
778             for index in xrange(servers_to_break):
779                 server = list(upload_servers)[index]
780                 for share in server.buckets.keys():
781                     server.buckets[share].abort()
782             buckets = {}
783             servermap = already_peers.copy()
784             for peer in upload_servers:
785                 buckets.update(peer.buckets)
786                 for bucket in peer.buckets:
787                     servermap.setdefault(bucket, set()).add(peer.peerid)
788             encoder.set_shareholders(buckets, servermap)
789             d = encoder.start()
790             return d
791         d.addCallback(_have_shareholders)
792         return d
793
794     def _has_happy_share_distribution(self):
795         servertoshnums = self.find_all_shares()
796         k = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['k']
797         h = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['happy']
798         return is_happy_enough(servertoshnums, h, k)
799
800     def _add_server(self, server_number, readonly=False):
801         assert self.g, "I tried to find a grid at self.g, but failed"
802         ss = self.g.make_server(server_number, readonly)
803         log.msg("just created a server, number: %s => %s" % (server_number, ss,))
804         self.g.add_server(server_number, ss)
805
806     def _add_server_with_share(self, server_number, share_number=None,
807                                readonly=False):
808         self._add_server(server_number, readonly)
809         if share_number is not None:
810             self._copy_share_to_server(share_number, server_number)
811
812
813     def _copy_share_to_server(self, share_number, server_number):
814         ss = self.g.servers_by_number[server_number]
815         # Copy share i from the directory associated with the first
816         # storage server to the directory associated with this one.
817         assert self.g, "I tried to find a grid at self.g, but failed"
818         assert self.shares, "I tried to find shares at self.shares, but failed"
819         old_share_location = self.shares[share_number][2]
820         new_share_location = os.path.join(ss.storedir, "shares")
821         si = uri.from_string(self.uri).get_storage_index()
822         new_share_location = os.path.join(new_share_location,
823                                           storage_index_to_dir(si))
824         if not os.path.exists(new_share_location):
825             os.makedirs(new_share_location)
826         new_share_location = os.path.join(new_share_location,
827                                           str(share_number))
828         if old_share_location != new_share_location:
829             shutil.copy(old_share_location, new_share_location)
830         shares = self.find_uri_shares(self.uri)
831         # Make sure that the storage server has the share.
832         self.failUnless((share_number, ss.my_nodeid, new_share_location)
833                         in shares)
834
835     def _setup_grid(self):
836         """
837         I set up a NoNetworkGrid with a single server and client.
838         """
839         self.set_up_grid(num_clients=1, num_servers=1)
840
841     def _setup_and_upload(self, **kwargs):
842         """
843         I set up a NoNetworkGrid with a single server and client,
844         upload a file to it, store its uri in self.uri, and store its
845         sharedata in self.shares.
846         """
847         self._setup_grid()
848         client = self.g.clients[0]
849         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
850         if "n" in kwargs and "k" in kwargs:
851             client.DEFAULT_ENCODING_PARAMETERS['k'] = kwargs['k']
852             client.DEFAULT_ENCODING_PARAMETERS['n'] = kwargs['n']
853         data = upload.Data("data" * 10000, convergence="")
854         self.data = data
855         d = client.upload(data)
856         def _store_uri(ur):
857             self.uri = ur.uri
858         d.addCallback(_store_uri)
859         d.addCallback(lambda ign:
860             self.find_uri_shares(self.uri))
861         def _store_shares(shares):
862             self.shares = shares
863         d.addCallback(_store_shares)
864         return d
865
866     def test_configure_parameters(self):
867         self.basedir = self.mktemp()
868         hooks = {0: self._set_up_nodes_extra_config}
869         self.set_up_grid(client_config_hooks=hooks)
870         c0 = self.g.clients[0]
871
872         DATA = "data" * 100
873         u = upload.Data(DATA, convergence="")
874         d = c0.upload(u)
875         d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
876         m = monitor.Monitor()
877         d.addCallback(lambda fn: fn.check(m))
878         def _check(cr):
879             data = cr.get_data()
880             self.failUnlessEqual(data["count-shares-needed"], 7)
881             self.failUnlessEqual(data["count-shares-expected"], 12)
882         d.addCallback(_check)
883         return d
884
885
886     def _setUp(self, ns):
887         # Used by test_happy_semantics and test_preexisting_share_behavior
888         # to set up the grid.
889         self.node = FakeClient(mode="good", num_servers=ns)
890         self.u = upload.Uploader()
891         self.u.running = True
892         self.u.parent = self.node
893
894
895     def test_happy_semantics(self):
896         self._setUp(2)
897         DATA = upload.Data("kittens" * 10000, convergence="")
898         # These parameters are unsatisfiable with only 2 servers.
899         self.set_encoding_parameters(k=3, happy=5, n=10)
900         d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
901                             "shares could be placed or found on only 2 "
902                             "server(s). We were asked to place shares on "
903                             "at least 5 server(s) such that any 3 of them "
904                             "have enough shares to recover the file",
905                             self.u.upload, DATA)
906         # Let's reset the client to have 10 servers
907         d.addCallback(lambda ign:
908             self._setUp(10))
909         # These parameters are satisfiable with 10 servers.
910         d.addCallback(lambda ign:
911             self.set_encoding_parameters(k=3, happy=5, n=10))
912         d.addCallback(lambda ign:
913             self.u.upload(DATA))
914         # Let's reset the client to have 7 servers
915         # (this is less than n, but more than h)
916         d.addCallback(lambda ign:
917             self._setUp(7))
918         # These parameters are satisfiable with 7 servers.
919         d.addCallback(lambda ign:
920             self.set_encoding_parameters(k=3, happy=5, n=10))
921         d.addCallback(lambda ign:
922             self.u.upload(DATA))
923         return d
924
925     def test_aborted_shares(self):
926         self.basedir = "upload/EncodingParameters/aborted_shares"
927         self.set_up_grid(num_servers=4)
928         c = self.g.clients[0]
929         DATA = upload.Data(100* "kittens", convergence="")
930         # These parameters are unsatisfiable with only 4 servers, but should
931         # work with 5, as long as the original 4 are not stuck in the open
932         # BucketWriter state (open() but not
933         parms = {"k":2, "happy":5, "n":5, "max_segment_size": 1*MiB}
934         c.DEFAULT_ENCODING_PARAMETERS = parms
935         d = self.shouldFail(UploadUnhappinessError, "test_aborted_shares",
936                             "shares could be placed on only 4 "
937                             "server(s) such that any 2 of them have enough "
938                             "shares to recover the file, but we were asked "
939                             "to place shares on at least 5 such servers",
940                             c.upload, DATA)
941         # now add the 5th server
942         d.addCallback(lambda ign: self._add_server(4, False))
943         # and this time the upload ought to succeed
944         d.addCallback(lambda ign: c.upload(DATA))
945         d.addCallback(lambda ign:
946             self.failUnless(self._has_happy_share_distribution()))
947         return d
948
949
950     def test_problem_layout_comment_52(self):
951         def _basedir():
952             self.basedir = self.mktemp()
953         _basedir()
954         # This scenario is at
955         # http://allmydata.org/trac/tahoe/ticket/778#comment:52
956         #
957         # The scenario in comment:52 proposes that we have a layout
958         # like:
959         # server 0: shares 1 - 9
960         # server 1: share 0, read-only
961         # server 2: share 0, read-only
962         # server 3: share 0, read-only
963         # To get access to the shares, we will first upload to one
964         # server, which will then have shares 0 - 9. We'll then
965         # add three new servers, configure them to not accept any new
966         # shares, then write share 0 directly into the serverdir of each,
967         # and then remove share 0 from server 0 in the same way.
968         # Then each of servers 1 - 3 will report that they have share 0,
969         # and will not accept any new share, while server 0 will report that
970         # it has shares 1 - 9 and will accept new shares.
971         # We'll then set 'happy' = 4, and see that an upload fails
972         # (as it should)
973         d = self._setup_and_upload()
974         d.addCallback(lambda ign:
975             self._add_server_with_share(server_number=1, share_number=0,
976                                         readonly=True))
977         d.addCallback(lambda ign:
978             self._add_server_with_share(server_number=2, share_number=0,
979                                         readonly=True))
980         d.addCallback(lambda ign:
981             self._add_server_with_share(server_number=3, share_number=0,
982                                         readonly=True))
983         # Remove the first share from server 0.
984         def _remove_share_0_from_server_0():
985             share_location = self.shares[0][2]
986             os.remove(share_location)
987         d.addCallback(lambda ign:
988             _remove_share_0_from_server_0())
989         # Set happy = 4 in the client.
990         def _prepare():
991             client = self.g.clients[0]
992             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
993             return client
994         d.addCallback(lambda ign:
995             _prepare())
996         # Uploading data should fail
997         d.addCallback(lambda client:
998             self.shouldFail(UploadUnhappinessError,
999                             "test_problem_layout_comment_52_test_1",
1000                             "shares could be placed or found on 4 server(s), "
1001                             "but they are not spread out evenly enough to "
1002                             "ensure that any 3 of these servers would have "
1003                             "enough shares to recover the file. "
1004                             "We were asked to place shares on at "
1005                             "least 4 servers such that any 3 of them have "
1006                             "enough shares to recover the file",
1007                             client.upload, upload.Data("data" * 10000,
1008                                                        convergence="")))
1009
1010         # Do comment:52, but like this:
1011         # server 2: empty
1012         # server 3: share 0, read-only
1013         # server 1: share 0, read-only
1014         # server 0: shares 0-9
1015         d.addCallback(lambda ign:
1016             _basedir())
1017         d.addCallback(lambda ign:
1018             self._setup_and_upload())
1019         d.addCallback(lambda ign:
1020             self._add_server(server_number=2))
1021         d.addCallback(lambda ign:
1022             self._add_server_with_share(server_number=3, share_number=0,
1023                                         readonly=True))
1024         d.addCallback(lambda ign:
1025             self._add_server_with_share(server_number=1, share_number=0,
1026                                         readonly=True))
1027         def _prepare2():
1028             client = self.g.clients[0]
1029             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1030             return client
1031         d.addCallback(lambda ign:
1032             _prepare2())
1033         d.addCallback(lambda client:
1034             self.shouldFail(UploadUnhappinessError,
1035                             "test_problem_layout_comment_52_test_2",
1036                             "shares could be placed on only 3 server(s) such "
1037                             "that any 3 of them have enough shares to recover "
1038                             "the file, but we were asked to place shares on "
1039                             "at least 4 such servers.",
1040                             client.upload, upload.Data("data" * 10000,
1041                                                        convergence="")))
1042         return d
1043
1044
1045     def test_problem_layout_comment_53(self):
1046         # This scenario is at
1047         # http://allmydata.org/trac/tahoe/ticket/778#comment:53
1048         #
1049         # Set up the grid to have one server
1050         def _change_basedir(ign):
1051             self.basedir = self.mktemp()
1052         _change_basedir(None)
1053         # We start by uploading all of the shares to one server.
1054         # Next, we'll add three new servers to our NoNetworkGrid. We'll add
1055         # one share from our initial upload to each of these.
1056         # The counterintuitive ordering of the share numbers is to deal with
1057         # the permuting of these servers -- distributing the shares this
1058         # way ensures that the Tahoe2PeerSelector sees them in the order
1059         # described below.
1060         d = self._setup_and_upload()
1061         d.addCallback(lambda ign:
1062             self._add_server_with_share(server_number=1, share_number=2))
1063         d.addCallback(lambda ign:
1064             self._add_server_with_share(server_number=2, share_number=0))
1065         d.addCallback(lambda ign:
1066             self._add_server_with_share(server_number=3, share_number=1))
1067         # So, we now have the following layout:
1068         # server 0: shares 0 - 9
1069         # server 1: share 2
1070         # server 2: share 0
1071         # server 3: share 1
1072         # We change the 'happy' parameter in the client to 4.
1073         # The Tahoe2PeerSelector will see the peers permuted as:
1074         # 2, 3, 1, 0
1075         # Ideally, a reupload of our original data should work.
1076         def _reset_encoding_parameters(ign, happy=4):
1077             client = self.g.clients[0]
1078             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1079             return client
1080         d.addCallback(_reset_encoding_parameters)
1081         d.addCallback(lambda client:
1082             client.upload(upload.Data("data" * 10000, convergence="")))
1083         d.addCallback(lambda ign:
1084             self.failUnless(self._has_happy_share_distribution()))
1085
1086
1087         # This scenario is basically comment:53, but changed so that the
1088         # Tahoe2PeerSelector sees the server with all of the shares before
1089         # any of the other servers.
1090         # The layout is:
1091         # server 2: shares 0 - 9
1092         # server 3: share 0
1093         # server 1: share 1
1094         # server 4: share 2
1095         # The Tahoe2PeerSelector sees the peers permuted as:
1096         # 2, 3, 1, 4
1097         # Note that server 0 has been replaced by server 4; this makes it
1098         # easier to ensure that the last server seen by Tahoe2PeerSelector
1099         # has only one share.
1100         d.addCallback(_change_basedir)
1101         d.addCallback(lambda ign:
1102             self._setup_and_upload())
1103         d.addCallback(lambda ign:
1104             self._add_server_with_share(server_number=2, share_number=0))
1105         d.addCallback(lambda ign:
1106             self._add_server_with_share(server_number=3, share_number=1))
1107         d.addCallback(lambda ign:
1108             self._add_server_with_share(server_number=1, share_number=2))
1109         # Copy all of the other shares to server number 2
1110         def _copy_shares(ign):
1111             for i in xrange(0, 10):
1112                 self._copy_share_to_server(i, 2)
1113         d.addCallback(_copy_shares)
1114         # Remove the first server, and add a placeholder with share 0
1115         d.addCallback(lambda ign:
1116             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1117         d.addCallback(lambda ign:
1118             self._add_server_with_share(server_number=4, share_number=0))
1119         # Now try uploading.
1120         d.addCallback(_reset_encoding_parameters)
1121         d.addCallback(lambda client:
1122             client.upload(upload.Data("data" * 10000, convergence="")))
1123         d.addCallback(lambda ign:
1124             self.failUnless(self._has_happy_share_distribution()))
1125
1126
1127         # Try the same thing, but with empty servers after the first one
1128         # We want to make sure that Tahoe2PeerSelector will redistribute
1129         # shares as necessary, not simply discover an existing layout.
1130         # The layout is:
1131         # server 2: shares 0 - 9
1132         # server 3: empty
1133         # server 1: empty
1134         # server 4: empty
1135         d.addCallback(_change_basedir)
1136         d.addCallback(lambda ign:
1137             self._setup_and_upload())
1138         d.addCallback(lambda ign:
1139             self._add_server(server_number=2))
1140         d.addCallback(lambda ign:
1141             self._add_server(server_number=3))
1142         d.addCallback(lambda ign:
1143             self._add_server(server_number=1))
1144         d.addCallback(lambda ign:
1145             self._add_server(server_number=4))
1146         d.addCallback(_copy_shares)
1147         d.addCallback(lambda ign:
1148             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1149         d.addCallback(_reset_encoding_parameters)
1150         d.addCallback(lambda client:
1151             client.upload(upload.Data("data" * 10000, convergence="")))
1152         # Make sure that only as many shares as necessary to satisfy
1153         # servers of happiness were pushed.
1154         d.addCallback(lambda results:
1155             self.failUnlessEqual(results.pushed_shares, 3))
1156         d.addCallback(lambda ign:
1157             self.failUnless(self._has_happy_share_distribution()))
1158         return d
1159
1160     def test_problem_layout_ticket1124(self):
1161         self.basedir = self.mktemp()
1162         d = self._setup_and_upload(k=2, n=4)
1163
1164         # server 0: shares 0, 1, 2, 3
1165         # server 1: shares 0, 3
1166         # server 2: share 1
1167         # server 3: share 2
1168         # With this layout, an upload should just be satisfied that the current distribution is good enough, right?
1169         def _setup(ign):
1170             self._add_server_with_share(server_number=0, share_number=None)
1171             self._add_server_with_share(server_number=1, share_number=0)
1172             self._add_server_with_share(server_number=2, share_number=1)
1173             self._add_server_with_share(server_number=3, share_number=2)
1174             # Copy shares
1175             self._copy_share_to_server(3, 1)
1176             client = self.g.clients[0]
1177             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1178             return client
1179
1180         d.addCallback(_setup)
1181         d.addCallback(lambda client:
1182             client.upload(upload.Data("data" * 10000, convergence="")))
1183         d.addCallback(lambda ign:
1184             self.failUnless(self._has_happy_share_distribution()))
1185         return d
1186     test_problem_layout_ticket1124.todo = "Fix this after 1.7.1 release."
1187
1188     def test_happiness_with_some_readonly_peers(self):
1189         # Try the following layout
1190         # server 2: shares 0-9
1191         # server 4: share 0, read-only
1192         # server 3: share 1, read-only
1193         # server 1: share 2, read-only
1194         self.basedir = self.mktemp()
1195         d = self._setup_and_upload()
1196         d.addCallback(lambda ign:
1197             self._add_server_with_share(server_number=2, share_number=0))
1198         d.addCallback(lambda ign:
1199             self._add_server_with_share(server_number=3, share_number=1,
1200                                         readonly=True))
1201         d.addCallback(lambda ign:
1202             self._add_server_with_share(server_number=1, share_number=2,
1203                                         readonly=True))
1204         # Copy all of the other shares to server number 2
1205         def _copy_shares(ign):
1206             for i in xrange(1, 10):
1207                 self._copy_share_to_server(i, 2)
1208         d.addCallback(_copy_shares)
1209         # Remove server 0, and add another in its place
1210         d.addCallback(lambda ign:
1211             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1212         d.addCallback(lambda ign:
1213             self._add_server_with_share(server_number=4, share_number=0,
1214                                         readonly=True))
1215         def _reset_encoding_parameters(ign, happy=4):
1216             client = self.g.clients[0]
1217             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1218             return client
1219         d.addCallback(_reset_encoding_parameters)
1220         d.addCallback(lambda client:
1221             client.upload(upload.Data("data" * 10000, convergence="")))
1222         d.addCallback(lambda ign:
1223             self.failUnless(self._has_happy_share_distribution()))
1224         return d
1225
1226
1227     def test_happiness_with_all_readonly_peers(self):
1228         # server 3: share 1, read-only
1229         # server 1: share 2, read-only
1230         # server 2: shares 0-9, read-only
1231         # server 4: share 0, read-only
1232         # The idea with this test is to make sure that the survey of
1233         # read-only peers doesn't undercount servers of happiness
1234         self.basedir = self.mktemp()
1235         d = self._setup_and_upload()
1236         d.addCallback(lambda ign:
1237             self._add_server_with_share(server_number=4, share_number=0,
1238                                         readonly=True))
1239         d.addCallback(lambda ign:
1240             self._add_server_with_share(server_number=3, share_number=1,
1241                                         readonly=True))
1242         d.addCallback(lambda ign:
1243             self._add_server_with_share(server_number=1, share_number=2,
1244                                         readonly=True))
1245         d.addCallback(lambda ign:
1246             self._add_server_with_share(server_number=2, share_number=0,
1247                                         readonly=True))
1248         def _copy_shares(ign):
1249             for i in xrange(1, 10):
1250                 self._copy_share_to_server(i, 2)
1251         d.addCallback(_copy_shares)
1252         d.addCallback(lambda ign:
1253             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1254         def _reset_encoding_parameters(ign, happy=4):
1255             client = self.g.clients[0]
1256             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1257             return client
1258         d.addCallback(_reset_encoding_parameters)
1259         d.addCallback(lambda client:
1260             client.upload(upload.Data("data" * 10000, convergence="")))
1261         d.addCallback(lambda ign:
1262             self.failUnless(self._has_happy_share_distribution()))
1263         return d
1264
1265
1266     def test_dropped_servers_in_encoder(self):
1267         # The Encoder does its own "servers_of_happiness" check if it
1268         # happens to lose a bucket during an upload (it assumes that
1269         # the layout presented to it satisfies "servers_of_happiness"
1270         # until a failure occurs)
1271         #
1272         # This test simulates an upload where servers break after peer
1273         # selection, but before they are written to.
1274         def _set_basedir(ign=None):
1275             self.basedir = self.mktemp()
1276         _set_basedir()
1277         d = self._setup_and_upload();
1278         # Add 5 servers
1279         def _do_server_setup(ign):
1280             self._add_server(server_number=1)
1281             self._add_server(server_number=2)
1282             self._add_server(server_number=3)
1283             self._add_server(server_number=4)
1284             self._add_server(server_number=5)
1285         d.addCallback(_do_server_setup)
1286         # remove the original server
1287         # (necessary to ensure that the Tahoe2PeerSelector will distribute
1288         #  all the shares)
1289         def _remove_server(ign):
1290             server = self.g.servers_by_number[0]
1291             self.g.remove_server(server.my_nodeid)
1292         d.addCallback(_remove_server)
1293         # This should succeed; we still have 4 servers, and the
1294         # happiness of the upload is 4.
1295         d.addCallback(lambda ign:
1296             self._do_upload_with_broken_servers(1))
1297         # Now, do the same thing over again, but drop 2 servers instead
1298         # of 1. This should fail, because servers_of_happiness is 4 and
1299         # we can't satisfy that.
1300         d.addCallback(_set_basedir)
1301         d.addCallback(lambda ign:
1302             self._setup_and_upload())
1303         d.addCallback(_do_server_setup)
1304         d.addCallback(_remove_server)
1305         d.addCallback(lambda ign:
1306             self.shouldFail(UploadUnhappinessError,
1307                             "test_dropped_servers_in_encoder",
1308                             "shares could be placed on only 3 server(s) "
1309                             "such that any 3 of them have enough shares to "
1310                             "recover the file, but we were asked to place "
1311                             "shares on at least 4",
1312                             self._do_upload_with_broken_servers, 2))
1313         # Now do the same thing over again, but make some of the servers
1314         # readonly, break some of the ones that aren't, and make sure that
1315         # happiness accounting is preserved.
1316         d.addCallback(_set_basedir)
1317         d.addCallback(lambda ign:
1318             self._setup_and_upload())
1319         def _do_server_setup_2(ign):
1320             self._add_server(1)
1321             self._add_server(2)
1322             self._add_server(3)
1323             self._add_server_with_share(4, 7, readonly=True)
1324             self._add_server_with_share(5, 8, readonly=True)
1325         d.addCallback(_do_server_setup_2)
1326         d.addCallback(_remove_server)
1327         d.addCallback(lambda ign:
1328             self._do_upload_with_broken_servers(1))
1329         d.addCallback(_set_basedir)
1330         d.addCallback(lambda ign:
1331             self._setup_and_upload())
1332         d.addCallback(_do_server_setup_2)
1333         d.addCallback(_remove_server)
1334         d.addCallback(lambda ign:
1335             self.shouldFail(UploadUnhappinessError,
1336                             "test_dropped_servers_in_encoder",
1337                             "shares could be placed on only 3 server(s) "
1338                             "such that any 3 of them have enough shares to "
1339                             "recover the file, but we were asked to place "
1340                             "shares on at least 4",
1341                             self._do_upload_with_broken_servers, 2))
1342         return d
1343
1344
1345     def test_merge_peers(self):
1346         # merge_peers merges a list of upload_servers and a dict of
1347         # shareid -> peerid mappings.
1348         shares = {
1349                     1 : set(["server1"]),
1350                     2 : set(["server2"]),
1351                     3 : set(["server3"]),
1352                     4 : set(["server4", "server5"]),
1353                     5 : set(["server1", "server2"]),
1354                  }
1355         # if not provided with a upload_servers argument, it should just
1356         # return the first argument unchanged.
1357         self.failUnlessEqual(shares, merge_peers(shares, set([])))
1358         class FakePeerTracker:
1359             pass
1360         trackers = []
1361         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1362             t = FakePeerTracker()
1363             t.peerid = server
1364             t.buckets = [i]
1365             trackers.append(t)
1366         expected = {
1367                     1 : set(["server1"]),
1368                     2 : set(["server2"]),
1369                     3 : set(["server3"]),
1370                     4 : set(["server4", "server5"]),
1371                     5 : set(["server1", "server2", "server5"]),
1372                     6 : set(["server6"]),
1373                     7 : set(["server7"]),
1374                     8 : set(["server8"]),
1375                    }
1376         self.failUnlessEqual(expected, merge_peers(shares, set(trackers)))
1377         shares2 = {}
1378         expected = {
1379                     5 : set(["server5"]),
1380                     6 : set(["server6"]),
1381                     7 : set(["server7"]),
1382                     8 : set(["server8"]),
1383                    }
1384         self.failUnlessEqual(expected, merge_peers(shares2, set(trackers)))
1385         shares3 = {}
1386         trackers = []
1387         expected = {}
1388         for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1389             shares3[i] = set([server])
1390             t = FakePeerTracker()
1391             t.peerid = server
1392             t.buckets = [i]
1393             trackers.append(t)
1394             expected[i] = set([server])
1395         self.failUnlessEqual(expected, merge_peers(shares3, set(trackers)))
1396
1397
1398     def test_servers_of_happiness_utility_function(self):
1399         # These tests are concerned with the servers_of_happiness()
1400         # utility function, and its underlying matching algorithm. Other
1401         # aspects of the servers_of_happiness behavior are tested
1402         # elsehwere These tests exist to ensure that
1403         # servers_of_happiness doesn't under or overcount the happiness
1404         # value for given inputs.
1405
1406         # servers_of_happiness expects a dict of
1407         # shnum => set(peerids) as a preexisting shares argument.
1408         test1 = {
1409                  1 : set(["server1"]),
1410                  2 : set(["server2"]),
1411                  3 : set(["server3"]),
1412                  4 : set(["server4"])
1413                 }
1414         happy = servers_of_happiness(test1)
1415         self.failUnlessEqual(4, happy)
1416         test1[4] = set(["server1"])
1417         # We've added a duplicate server, so now servers_of_happiness
1418         # should be 3 instead of 4.
1419         happy = servers_of_happiness(test1)
1420         self.failUnlessEqual(3, happy)
1421         # The second argument of merge_peers should be a set of
1422         # objects with peerid and buckets as attributes. In actual use,
1423         # these will be PeerTracker instances, but for testing it is fine
1424         # to make a FakePeerTracker whose job is to hold those instance
1425         # variables to test that part.
1426         class FakePeerTracker:
1427             pass
1428         trackers = []
1429         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1430             t = FakePeerTracker()
1431             t.peerid = server
1432             t.buckets = [i]
1433             trackers.append(t)
1434         # Recall that test1 is a server layout with servers_of_happiness
1435         # = 3.  Since there isn't any overlap between the shnum ->
1436         # set([peerid]) correspondences in test1 and those in trackers,
1437         # the result here should be 7.
1438         test2 = merge_peers(test1, set(trackers))
1439         happy = servers_of_happiness(test2)
1440         self.failUnlessEqual(7, happy)
1441         # Now add an overlapping server to trackers. This is redundant,
1442         # so it should not cause the previously reported happiness value
1443         # to change.
1444         t = FakePeerTracker()
1445         t.peerid = "server1"
1446         t.buckets = [1]
1447         trackers.append(t)
1448         test2 = merge_peers(test1, set(trackers))
1449         happy = servers_of_happiness(test2)
1450         self.failUnlessEqual(7, happy)
1451         test = {}
1452         happy = servers_of_happiness(test)
1453         self.failUnlessEqual(0, happy)
1454         # Test a more substantial overlap between the trackers and the
1455         # existing assignments.
1456         test = {
1457             1 : set(['server1']),
1458             2 : set(['server2']),
1459             3 : set(['server3']),
1460             4 : set(['server4']),
1461         }
1462         trackers = []
1463         t = FakePeerTracker()
1464         t.peerid = 'server5'
1465         t.buckets = [4]
1466         trackers.append(t)
1467         t = FakePeerTracker()
1468         t.peerid = 'server6'
1469         t.buckets = [3, 5]
1470         trackers.append(t)
1471         # The value returned by servers_of_happiness is the size
1472         # of a maximum matching in the bipartite graph that
1473         # servers_of_happiness() makes between peerids and share
1474         # numbers. It should find something like this:
1475         # (server 1, share 1)
1476         # (server 2, share 2)
1477         # (server 3, share 3)
1478         # (server 5, share 4)
1479         # (server 6, share 5)
1480         #
1481         # and, since there are 5 edges in this matching, it should
1482         # return 5.
1483         test2 = merge_peers(test, set(trackers))
1484         happy = servers_of_happiness(test2)
1485         self.failUnlessEqual(5, happy)
1486         # Zooko's first puzzle:
1487         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1488         #
1489         # server 1: shares 0, 1
1490         # server 2: shares 1, 2
1491         # server 3: share 2
1492         #
1493         # This should yield happiness of 3.
1494         test = {
1495             0 : set(['server1']),
1496             1 : set(['server1', 'server2']),
1497             2 : set(['server2', 'server3']),
1498         }
1499         self.failUnlessEqual(3, servers_of_happiness(test))
1500         # Zooko's second puzzle:
1501         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1502         #
1503         # server 1: shares 0, 1
1504         # server 2: share 1
1505         #
1506         # This should yield happiness of 2.
1507         test = {
1508             0 : set(['server1']),
1509             1 : set(['server1', 'server2']),
1510         }
1511         self.failUnlessEqual(2, servers_of_happiness(test))
1512
1513
1514     def test_shares_by_server(self):
1515         test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1516         sbs = shares_by_server(test)
1517         self.failUnlessEqual(set([1]), sbs["server1"])
1518         self.failUnlessEqual(set([2]), sbs["server2"])
1519         self.failUnlessEqual(set([3]), sbs["server3"])
1520         self.failUnlessEqual(set([4]), sbs["server4"])
1521         test1 = {
1522                     1 : set(["server1"]),
1523                     2 : set(["server1"]),
1524                     3 : set(["server1"]),
1525                     4 : set(["server2"]),
1526                     5 : set(["server2"])
1527                 }
1528         sbs = shares_by_server(test1)
1529         self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1530         self.failUnlessEqual(set([4, 5]), sbs["server2"])
1531         # This should fail unless the peerid part of the mapping is a set
1532         test2 = {1: "server1"}
1533         self.shouldFail(AssertionError,
1534                        "test_shares_by_server",
1535                        "",
1536                        shares_by_server, test2)
1537
1538
1539     def test_existing_share_detection(self):
1540         self.basedir = self.mktemp()
1541         d = self._setup_and_upload()
1542         # Our final setup should look like this:
1543         # server 1: shares 0 - 9, read-only
1544         # server 2: empty
1545         # server 3: empty
1546         # server 4: empty
1547         # The purpose of this test is to make sure that the peer selector
1548         # knows about the shares on server 1, even though it is read-only.
1549         # It used to simply filter these out, which would cause the test
1550         # to fail when servers_of_happiness = 4.
1551         d.addCallback(lambda ign:
1552             self._add_server_with_share(1, 0, True))
1553         d.addCallback(lambda ign:
1554             self._add_server(2))
1555         d.addCallback(lambda ign:
1556             self._add_server(3))
1557         d.addCallback(lambda ign:
1558             self._add_server(4))
1559         def _copy_shares(ign):
1560             for i in xrange(1, 10):
1561                 self._copy_share_to_server(i, 1)
1562         d.addCallback(_copy_shares)
1563         d.addCallback(lambda ign:
1564             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1565         def _prepare_client(ign):
1566             client = self.g.clients[0]
1567             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1568             return client
1569         d.addCallback(_prepare_client)
1570         d.addCallback(lambda client:
1571             client.upload(upload.Data("data" * 10000, convergence="")))
1572         d.addCallback(lambda ign:
1573             self.failUnless(self._has_happy_share_distribution()))
1574         return d
1575
1576
1577     def test_query_counting(self):
1578         # If peer selection fails, Tahoe2PeerSelector prints out a lot
1579         # of helpful diagnostic information, including query stats.
1580         # This test helps make sure that that information is accurate.
1581         self.basedir = self.mktemp()
1582         d = self._setup_and_upload()
1583         def _setup(ign):
1584             for i in xrange(1, 11):
1585                 self._add_server(server_number=i)
1586             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1587             c = self.g.clients[0]
1588             # We set happy to an unsatisfiable value so that we can check the
1589             # counting in the exception message. The same progress message
1590             # is also used when the upload is successful, but in that case it
1591             # only gets written to a log, so we can't see what it says.
1592             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1593             return c
1594         d.addCallback(_setup)
1595         d.addCallback(lambda c:
1596             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1597                             "10 queries placed some shares",
1598                             c.upload, upload.Data("data" * 10000,
1599                                                   convergence="")))
1600         # Now try with some readonly servers. We want to make sure that
1601         # the readonly peer share discovery phase is counted correctly.
1602         def _reset(ign):
1603             self.basedir = self.mktemp()
1604             self.g = None
1605         d.addCallback(_reset)
1606         d.addCallback(lambda ign:
1607             self._setup_and_upload())
1608         def _then(ign):
1609             for i in xrange(1, 11):
1610                 self._add_server(server_number=i)
1611             self._add_server(server_number=11, readonly=True)
1612             self._add_server(server_number=12, readonly=True)
1613             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1614             c = self.g.clients[0]
1615             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1616             return c
1617         d.addCallback(_then)
1618         d.addCallback(lambda c:
1619             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1620                             "2 placed none (of which 2 placed none due to "
1621                             "the server being full",
1622                             c.upload, upload.Data("data" * 10000,
1623                                                   convergence="")))
1624         # Now try the case where the upload process finds a bunch of the
1625         # shares that it wants to place on the first server, including
1626         # the one that it wanted to allocate there. Though no shares will
1627         # be allocated in this request, it should still be called
1628         # productive, since it caused some homeless shares to be
1629         # removed.
1630         d.addCallback(_reset)
1631         d.addCallback(lambda ign:
1632             self._setup_and_upload())
1633
1634         def _next(ign):
1635             for i in xrange(1, 11):
1636                 self._add_server(server_number=i)
1637             # Copy all of the shares to server 9, since that will be
1638             # the first one that the selector sees.
1639             for i in xrange(10):
1640                 self._copy_share_to_server(i, 9)
1641             # Remove server 0, and its contents
1642             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1643             # Make happiness unsatisfiable
1644             c = self.g.clients[0]
1645             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1646             return c
1647         d.addCallback(_next)
1648         d.addCallback(lambda c:
1649             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1650                             "1 queries placed some shares",
1651                             c.upload, upload.Data("data" * 10000,
1652                                                   convergence="")))
1653         return d
1654
1655
1656     def test_upper_limit_on_readonly_queries(self):
1657         self.basedir = self.mktemp()
1658         d = self._setup_and_upload()
1659         def _then(ign):
1660             for i in xrange(1, 11):
1661                 self._add_server(server_number=i, readonly=True)
1662             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1663             c = self.g.clients[0]
1664             c.DEFAULT_ENCODING_PARAMETERS['k'] = 2
1665             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1666             c.DEFAULT_ENCODING_PARAMETERS['n'] = 4
1667             return c
1668         d.addCallback(_then)
1669         d.addCallback(lambda client:
1670             self.shouldFail(UploadUnhappinessError,
1671                             "test_upper_limit_on_readonly_queries",
1672                             "sent 8 queries to 8 peers",
1673                             client.upload,
1674                             upload.Data('data' * 10000, convergence="")))
1675         return d
1676
1677
1678     def test_exception_messages_during_peer_selection(self):
1679         # server 1: read-only, no shares
1680         # server 2: read-only, no shares
1681         # server 3: read-only, no shares
1682         # server 4: read-only, no shares
1683         # server 5: read-only, no shares
1684         # This will fail, but we want to make sure that the log messages
1685         # are informative about why it has failed.
1686         self.basedir = self.mktemp()
1687         d = self._setup_and_upload()
1688         d.addCallback(lambda ign:
1689             self._add_server(server_number=1, readonly=True))
1690         d.addCallback(lambda ign:
1691             self._add_server(server_number=2, readonly=True))
1692         d.addCallback(lambda ign:
1693             self._add_server(server_number=3, readonly=True))
1694         d.addCallback(lambda ign:
1695             self._add_server(server_number=4, readonly=True))
1696         d.addCallback(lambda ign:
1697             self._add_server(server_number=5, readonly=True))
1698         d.addCallback(lambda ign:
1699             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1700         def _reset_encoding_parameters(ign, happy=4):
1701             client = self.g.clients[0]
1702             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1703             return client
1704         d.addCallback(_reset_encoding_parameters)
1705         d.addCallback(lambda client:
1706             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1707                             "placed 0 shares out of 10 "
1708                             "total (10 homeless), want to place shares on at "
1709                             "least 4 servers such that any 3 of them have "
1710                             "enough shares to recover the file, "
1711                             "sent 5 queries to 5 peers, 0 queries placed "
1712                             "some shares, 5 placed none "
1713                             "(of which 5 placed none due to the server being "
1714                             "full and 0 placed none due to an error)",
1715                             client.upload,
1716                             upload.Data("data" * 10000, convergence="")))
1717
1718
1719         # server 1: read-only, no shares
1720         # server 2: broken, no shares
1721         # server 3: read-only, no shares
1722         # server 4: read-only, no shares
1723         # server 5: read-only, no shares
1724         def _reset(ign):
1725             self.basedir = self.mktemp()
1726         d.addCallback(_reset)
1727         d.addCallback(lambda ign:
1728             self._setup_and_upload())
1729         d.addCallback(lambda ign:
1730             self._add_server(server_number=1, readonly=True))
1731         d.addCallback(lambda ign:
1732             self._add_server(server_number=2))
1733         def _break_server_2(ign):
1734             server = self.g.servers_by_number[2].my_nodeid
1735             # We have to break the server in servers_by_id,
1736             # because the one in servers_by_number isn't wrapped,
1737             # and doesn't look at its broken attribute when answering
1738             # queries.
1739             self.g.servers_by_id[server].broken = True
1740         d.addCallback(_break_server_2)
1741         d.addCallback(lambda ign:
1742             self._add_server(server_number=3, readonly=True))
1743         d.addCallback(lambda ign:
1744             self._add_server(server_number=4, readonly=True))
1745         d.addCallback(lambda ign:
1746             self._add_server(server_number=5, readonly=True))
1747         d.addCallback(lambda ign:
1748             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1749         d.addCallback(_reset_encoding_parameters)
1750         d.addCallback(lambda client:
1751             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1752                             "placed 0 shares out of 10 "
1753                             "total (10 homeless), want to place shares on at "
1754                             "least 4 servers such that any 3 of them have "
1755                             "enough shares to recover the file, "
1756                             "sent 5 queries to 5 peers, 0 queries placed "
1757                             "some shares, 5 placed none "
1758                             "(of which 4 placed none due to the server being "
1759                             "full and 1 placed none due to an error)",
1760                             client.upload,
1761                             upload.Data("data" * 10000, convergence="")))
1762         # server 0, server 1 = empty, accepting shares
1763         # This should place all of the shares, but still fail with happy=4.
1764         # We want to make sure that the exception message is worded correctly.
1765         d.addCallback(_reset)
1766         d.addCallback(lambda ign:
1767             self._setup_grid())
1768         d.addCallback(lambda ign:
1769             self._add_server(server_number=1))
1770         d.addCallback(_reset_encoding_parameters)
1771         d.addCallback(lambda client:
1772             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1773                             "shares could be placed or found on only 2 "
1774                             "server(s). We were asked to place shares on at "
1775                             "least 4 server(s) such that any 3 of them have "
1776                             "enough shares to recover the file.",
1777                             client.upload, upload.Data("data" * 10000,
1778                                                        convergence="")))
1779         # servers 0 - 4 = empty, accepting shares
1780         # This too should place all the shares, and this too should fail,
1781         # but since the effective happiness is more than the k encoding
1782         # parameter, it should trigger a different error message than the one
1783         # above.
1784         d.addCallback(_reset)
1785         d.addCallback(lambda ign:
1786             self._setup_grid())
1787         d.addCallback(lambda ign:
1788             self._add_server(server_number=1))
1789         d.addCallback(lambda ign:
1790             self._add_server(server_number=2))
1791         d.addCallback(lambda ign:
1792             self._add_server(server_number=3))
1793         d.addCallback(lambda ign:
1794             self._add_server(server_number=4))
1795         d.addCallback(_reset_encoding_parameters, happy=7)
1796         d.addCallback(lambda client:
1797             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1798                             "shares could be placed on only 5 server(s) such "
1799                             "that any 3 of them have enough shares to recover "
1800                             "the file, but we were asked to place shares on "
1801                             "at least 7 such servers.",
1802                             client.upload, upload.Data("data" * 10000,
1803                                                        convergence="")))
1804         # server 0: shares 0 - 9
1805         # server 1: share 0, read-only
1806         # server 2: share 0, read-only
1807         # server 3: share 0, read-only
1808         # This should place all of the shares, but fail with happy=4.
1809         # Since the number of servers with shares is more than the number
1810         # necessary to reconstitute the file, this will trigger a different
1811         # error message than either of those above.
1812         d.addCallback(_reset)
1813         d.addCallback(lambda ign:
1814             self._setup_and_upload())
1815         d.addCallback(lambda ign:
1816             self._add_server_with_share(server_number=1, share_number=0,
1817                                         readonly=True))
1818         d.addCallback(lambda ign:
1819             self._add_server_with_share(server_number=2, share_number=0,
1820                                         readonly=True))
1821         d.addCallback(lambda ign:
1822             self._add_server_with_share(server_number=3, share_number=0,
1823                                         readonly=True))
1824         d.addCallback(_reset_encoding_parameters, happy=7)
1825         d.addCallback(lambda client:
1826             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1827                             "shares could be placed or found on 4 server(s), "
1828                             "but they are not spread out evenly enough to "
1829                             "ensure that any 3 of these servers would have "
1830                             "enough shares to recover the file. We were asked "
1831                             "to place shares on at least 7 servers such that "
1832                             "any 3 of them have enough shares to recover the "
1833                             "file",
1834                             client.upload, upload.Data("data" * 10000,
1835                                                        convergence="")))
1836         return d
1837
1838
1839     def test_problem_layout_comment_187(self):
1840         # #778 comment 187 broke an initial attempt at a share
1841         # redistribution algorithm. This test is here to demonstrate the
1842         # breakage, and to test that subsequent algorithms don't also
1843         # break in the same way.
1844         self.basedir = self.mktemp()
1845         d = self._setup_and_upload(k=2, n=3)
1846
1847         # server 1: shares 0, 1, 2, readonly
1848         # server 2: share 0, readonly
1849         # server 3: share 0
1850         def _setup(ign):
1851             self._add_server_with_share(server_number=1, share_number=0,
1852                                         readonly=True)
1853             self._add_server_with_share(server_number=2, share_number=0,
1854                                         readonly=True)
1855             self._add_server_with_share(server_number=3, share_number=0)
1856             # Copy shares
1857             self._copy_share_to_server(1, 1)
1858             self._copy_share_to_server(2, 1)
1859             # Remove server 0
1860             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1861             client = self.g.clients[0]
1862             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
1863             return client
1864
1865         d.addCallback(_setup)
1866         d.addCallback(lambda client:
1867             client.upload(upload.Data("data" * 10000, convergence="")))
1868         d.addCallback(lambda ign:
1869             self.failUnless(self._has_happy_share_distribution()))
1870         return d
1871     test_problem_layout_comment_187.todo = "this isn't fixed yet"
1872
1873     def test_problem_layout_ticket_1118(self):
1874         # #1118 includes a report from a user who hit an assertion in
1875         # the upload code with this layout.
1876         self.basedir = self.mktemp()
1877         d = self._setup_and_upload(k=2, n=4)
1878
1879         # server 0: no shares
1880         # server 1: shares 0, 3
1881         # server 3: share 1
1882         # server 2: share 2
1883         # The order that they get queries is 0, 1, 3, 2
1884         def _setup(ign):
1885             self._add_server(server_number=0)
1886             self._add_server_with_share(server_number=1, share_number=0)
1887             self._add_server_with_share(server_number=2, share_number=2)
1888             self._add_server_with_share(server_number=3, share_number=1)
1889             # Copy shares
1890             self._copy_share_to_server(3, 1)
1891             storedir = self.get_serverdir(0)
1892             # remove the storedir, wiping out any existing shares
1893             shutil.rmtree(storedir)
1894             # create an empty storedir to replace the one we just removed
1895             os.mkdir(storedir)
1896             client = self.g.clients[0]
1897             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1898             return client
1899
1900         d.addCallback(_setup)
1901         # Note: actually it should succeed! See
1902         # test_problem_layout_ticket_1128. But ticket 1118 is just to
1903         # make it realize that it has failed, so if it raises
1904         # UploadUnhappinessError then we'll give it the green light
1905         # for now.
1906         d.addCallback(lambda ignored:
1907             self.shouldFail(UploadUnhappinessError,
1908                             "test_problem_layout_ticket_1118",
1909                             "",
1910                             self.g.clients[0].upload, upload.Data("data" * 10000,
1911                                                        convergence="")))
1912         return d
1913
1914     def test_problem_layout_ticket_1128(self):
1915         # #1118 includes a report from a user who hit an assertion in
1916         # the upload code with this layout.
1917         self.basedir = self.mktemp()
1918         d = self._setup_and_upload(k=2, n=4)
1919
1920         # server 0: no shares
1921         # server 1: shares 0, 3
1922         # server 3: share 1
1923         # server 2: share 2
1924         # The order that they get queries is 0, 1, 3, 2
1925         def _setup(ign):
1926             self._add_server(server_number=0)
1927             self._add_server_with_share(server_number=1, share_number=0)
1928             self._add_server_with_share(server_number=2, share_number=2)
1929             self._add_server_with_share(server_number=3, share_number=1)
1930             # Copy shares
1931             self._copy_share_to_server(3, 1)
1932             storedir = self.get_serverdir(0)
1933             # remove the storedir, wiping out any existing shares
1934             shutil.rmtree(storedir)
1935             # create an empty storedir to replace the one we just removed
1936             os.mkdir(storedir)
1937             client = self.g.clients[0]
1938             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1939             return client
1940
1941         d.addCallback(_setup)
1942         d.addCallback(lambda client:
1943                           client.upload(upload.Data("data" * 10000, convergence="")))
1944         d.addCallback(lambda ign:
1945             self.failUnless(self._has_happy_share_distribution()))
1946         return d
1947     test_problem_layout_ticket_1128.todo = "Invent a smarter uploader that uploads successfully in this case."
1948
1949     def test_upload_succeeds_with_some_homeless_shares(self):
1950         # If the upload is forced to stop trying to place shares before
1951         # it has placed (or otherwise accounted) for all of them, but it
1952         # has placed enough to satisfy the upload health criteria that
1953         # we're using, it should still succeed.
1954         self.basedir = self.mktemp()
1955         d = self._setup_and_upload()
1956         def _server_setup(ign):
1957             # Add four servers so that we have a layout like this:
1958             # server 1: share 0, read-only
1959             # server 2: share 1, read-only
1960             # server 3: share 2, read-only
1961             # server 4: share 3, read-only
1962             # If we set happy = 4, the upload will manage to satisfy
1963             # servers of happiness, but not place all of the shares; we
1964             # want to test that the upload is declared successful in
1965             # this case.
1966             self._add_server_with_share(server_number=1, share_number=0,
1967                                         readonly=True)
1968             self._add_server_with_share(server_number=2, share_number=1,
1969                                         readonly=True)
1970             self._add_server_with_share(server_number=3, share_number=2,
1971                                         readonly=True)
1972             self._add_server_with_share(server_number=4, share_number=3,
1973                                         readonly=True)
1974             # Remove server 0.
1975             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1976             # Set the client appropriately
1977             c = self.g.clients[0]
1978             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1979             return c
1980         d.addCallback(_server_setup)
1981         d.addCallback(lambda client:
1982             client.upload(upload.Data("data" * 10000, convergence="")))
1983         d.addCallback(lambda ign:
1984             self.failUnless(self._has_happy_share_distribution()))
1985         return d
1986
1987
1988     def test_uploader_skips_over_servers_with_only_one_share(self):
1989         # We want to make sure that the redistribution logic ignores
1990         # servers with only one share, since placing these shares
1991         # elsewhere will at best keep happiness the same as it was, and
1992         # at worst hurt it.
1993         self.basedir = self.mktemp()
1994         d = self._setup_and_upload()
1995         def _server_setup(ign):
1996             # Add some servers so that the upload will need to
1997             # redistribute, but will first pass over a couple of servers
1998             # that don't have enough shares to redistribute before
1999             # finding one that does have shares to redistribute.
2000             self._add_server_with_share(server_number=1, share_number=0)
2001             self._add_server_with_share(server_number=2, share_number=2)
2002             self._add_server_with_share(server_number=3, share_number=1)
2003             self._add_server_with_share(server_number=8, share_number=4)
2004             self._add_server_with_share(server_number=5, share_number=5)
2005             self._add_server_with_share(server_number=10, share_number=7)
2006             for i in xrange(4):
2007                 self._copy_share_to_server(i, 2)
2008             return self.g.clients[0]
2009         d.addCallback(_server_setup)
2010         d.addCallback(lambda client:
2011             client.upload(upload.Data("data" * 10000, convergence="")))
2012         d.addCallback(lambda ign:
2013             self.failUnless(self._has_happy_share_distribution()))
2014         return d
2015
2016
2017     def test_peer_selector_bucket_abort(self):
2018         # If peer selection for an upload fails due to an unhappy
2019         # layout, the peer selection process should abort the buckets it
2020         # allocates before failing, so that the space can be re-used.
2021         self.basedir = self.mktemp()
2022         self.set_up_grid(num_servers=5)
2023
2024         # Try to upload a file with happy=7, which is unsatisfiable with
2025         # the current grid. This will fail, but should not take up any
2026         # space on the storage servers after it fails.
2027         client = self.g.clients[0]
2028         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2029         d = defer.succeed(None)
2030         d.addCallback(lambda ignored:
2031             self.shouldFail(UploadUnhappinessError,
2032                             "test_peer_selection_bucket_abort",
2033                             "",
2034                             client.upload, upload.Data("data" * 10000,
2035                                                        convergence="")))
2036         # wait for the abort messages to get there.
2037         def _turn_barrier(res):
2038             return fireEventually(res)
2039         d.addCallback(_turn_barrier)
2040         def _then(ignored):
2041             for server in self.g.servers_by_number.values():
2042                 self.failUnlessEqual(server.allocated_size(), 0)
2043         d.addCallback(_then)
2044         return d
2045
2046
2047     def test_encoder_bucket_abort(self):
2048         # If enough servers die in the process of encoding and uploading
2049         # a file to make the layout unhappy, we should cancel the
2050         # newly-allocated buckets before dying.
2051         self.basedir = self.mktemp()
2052         self.set_up_grid(num_servers=4)
2053
2054         client = self.g.clients[0]
2055         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2056
2057         d = defer.succeed(None)
2058         d.addCallback(lambda ignored:
2059             self.shouldFail(UploadUnhappinessError,
2060                             "test_encoder_bucket_abort",
2061                             "",
2062                             self._do_upload_with_broken_servers, 1))
2063         def _turn_barrier(res):
2064             return fireEventually(res)
2065         d.addCallback(_turn_barrier)
2066         def _then(ignored):
2067             for server in self.g.servers_by_number.values():
2068                 self.failUnlessEqual(server.allocated_size(), 0)
2069         d.addCallback(_then)
2070         return d
2071
2072
2073     def _set_up_nodes_extra_config(self, clientdir):
2074         cfgfn = os.path.join(clientdir, "tahoe.cfg")
2075         oldcfg = open(cfgfn, "r").read()
2076         f = open(cfgfn, "wt")
2077         f.write(oldcfg)
2078         f.write("\n")
2079         f.write("[client]\n")
2080         f.write("shares.needed = 7\n")
2081         f.write("shares.total = 12\n")
2082         f.write("\n")
2083         f.close()
2084         return None
2085
2086 # TODO:
2087 #  upload with exactly 75 peers (shares_of_happiness)
2088 #  have a download fail
2089 #  cancel a download (need to implement more cancel stuff)