]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
immutable: extend the tests to check that the shares that got uploaded really do...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1 # -*- coding: utf-8 -*-
2
3 import os, shutil
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.python.failure import Failure
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
9
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
14 from allmydata.util.assertutil import precondition
15 from allmydata.util.deferredutil import DeferredListShouldSucceed
16 from allmydata.test.no_network import GridTestMixin
17 from allmydata.test.common_util import ShouldFailMixin
18 from allmydata.util.happinessutil import servers_of_happiness, \
19                                          shares_by_server, merge_peers
20 from allmydata.storage_client import StorageFarmBroker
21 from allmydata.storage.server import storage_index_to_dir
22
23 MiB = 1024*1024
24
25 def extract_uri(results):
26     return results.uri
27
28 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
29 # have been due to an earlier test ERROR'ing out due to timeout, which seems
30 # to screw up subsequent tests.
31 timeout = 960
32
33 class Uploadable(unittest.TestCase):
34     def shouldEqual(self, data, expected):
35         self.failUnless(isinstance(data, list))
36         for e in data:
37             self.failUnless(isinstance(e, str))
38         s = "".join(data)
39         self.failUnlessEqual(s, expected)
40
41     def test_filehandle_random_key(self):
42         return self._test_filehandle(convergence=None)
43
44     def test_filehandle_convergent_encryption(self):
45         return self._test_filehandle(convergence="some convergence string")
46
47     def _test_filehandle(self, convergence):
48         s = StringIO("a"*41)
49         u = upload.FileHandle(s, convergence=convergence)
50         d = u.get_size()
51         d.addCallback(self.failUnlessEqual, 41)
52         d.addCallback(lambda res: u.read(1))
53         d.addCallback(self.shouldEqual, "a")
54         d.addCallback(lambda res: u.read(80))
55         d.addCallback(self.shouldEqual, "a"*40)
56         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
57         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
58         return d
59
60     def test_filename(self):
61         basedir = "upload/Uploadable/test_filename"
62         os.makedirs(basedir)
63         fn = os.path.join(basedir, "file")
64         f = open(fn, "w")
65         f.write("a"*41)
66         f.close()
67         u = upload.FileName(fn, convergence=None)
68         d = u.get_size()
69         d.addCallback(self.failUnlessEqual, 41)
70         d.addCallback(lambda res: u.read(1))
71         d.addCallback(self.shouldEqual, "a")
72         d.addCallback(lambda res: u.read(80))
73         d.addCallback(self.shouldEqual, "a"*40)
74         d.addCallback(lambda res: u.close())
75         return d
76
77     def test_data(self):
78         s = "a"*41
79         u = upload.Data(s, convergence=None)
80         d = u.get_size()
81         d.addCallback(self.failUnlessEqual, 41)
82         d.addCallback(lambda res: u.read(1))
83         d.addCallback(self.shouldEqual, "a")
84         d.addCallback(lambda res: u.read(80))
85         d.addCallback(self.shouldEqual, "a"*40)
86         d.addCallback(lambda res: u.close())
87         return d
88
89 class ServerError(Exception):
90     pass
91
92 class SetDEPMixin:
93     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
94         p = {"k": k,
95              "happy": happy,
96              "n": n,
97              "max_segment_size": max_segsize,
98              }
99         self.node.DEFAULT_ENCODING_PARAMETERS = p
100
101 class FakeStorageServer:
102     def __init__(self, mode):
103         self.mode = mode
104         self.allocated = []
105         self.queries = 0
106         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
107                          { "maximum-immutable-share-size": 2**32 },
108                          "application-version": str(allmydata.__full_version__),
109                          }
110         if mode == "small":
111             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
112                              { "maximum-immutable-share-size": 10 },
113                              "application-version": str(allmydata.__full_version__),
114                              }
115
116
117     def callRemote(self, methname, *args, **kwargs):
118         def _call():
119             meth = getattr(self, methname)
120             return meth(*args, **kwargs)
121         d = fireEventually()
122         d.addCallback(lambda res: _call())
123         return d
124
125     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
126                          sharenums, share_size, canary):
127         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
128         if self.mode == "first-fail":
129             if self.queries == 0:
130                 raise ServerError
131         if self.mode == "second-fail":
132             if self.queries == 1:
133                 raise ServerError
134         self.queries += 1
135         if self.mode == "full":
136             return (set(), {},)
137         elif self.mode == "already got them":
138             return (set(sharenums), {},)
139         else:
140             for shnum in sharenums:
141                 self.allocated.append( (storage_index, shnum) )
142             return (set(),
143                     dict([( shnum, FakeBucketWriter(share_size) )
144                           for shnum in sharenums]),
145                     )
146
147 class FakeBucketWriter:
148     # a diagnostic version of storageserver.BucketWriter
149     def __init__(self, size):
150         self.data = StringIO()
151         self.closed = False
152         self._size = size
153
154     def callRemote(self, methname, *args, **kwargs):
155         def _call():
156             meth = getattr(self, "remote_" + methname)
157             return meth(*args, **kwargs)
158         d = fireEventually()
159         d.addCallback(lambda res: _call())
160         return d
161
162
163     def callRemoteOnly(self, methname, *args, **kwargs):
164         d = self.callRemote(methname, *args, **kwargs)
165         del d # callRemoteOnly ignores this
166         return None
167
168
169     def remote_write(self, offset, data):
170         precondition(not self.closed)
171         precondition(offset >= 0)
172         precondition(offset+len(data) <= self._size,
173                      "offset=%d + data=%d > size=%d" %
174                      (offset, len(data), self._size))
175         self.data.seek(offset)
176         self.data.write(data)
177
178     def remote_close(self):
179         precondition(not self.closed)
180         self.closed = True
181
182     def remote_abort(self):
183         pass
184
185 class FakeClient:
186     DEFAULT_ENCODING_PARAMETERS = {"k":25,
187                                    "happy": 25,
188                                    "n": 100,
189                                    "max_segment_size": 1*MiB,
190                                    }
191     def __init__(self, mode="good", num_servers=50):
192         self.num_servers = num_servers
193         if type(mode) is str:
194             mode = dict([i,mode] for i in range(num_servers))
195         peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
196                   for fakeid in range(self.num_servers) ]
197         self.storage_broker = StorageFarmBroker(None, permute_peers=True)
198         for (serverid, server) in peers:
199             self.storage_broker.test_add_server(serverid, server)
200         self.last_peers = [p[1] for p in peers]
201
202     def log(self, *args, **kwargs):
203         pass
204     def get_encoding_parameters(self):
205         return self.DEFAULT_ENCODING_PARAMETERS
206     def get_storage_broker(self):
207         return self.storage_broker
208     _secret_holder = client.SecretHolder("lease secret", "convergence secret")
209
210 class GotTooFarError(Exception):
211     pass
212
213 class GiganticUploadable(upload.FileHandle):
214     def __init__(self, size):
215         self._size = size
216         self._fp = 0
217
218     def get_encryption_key(self):
219         return defer.succeed("\x00" * 16)
220     def get_size(self):
221         return defer.succeed(self._size)
222     def read(self, length):
223         left = self._size - self._fp
224         length = min(left, length)
225         self._fp += length
226         if self._fp > 1000000:
227             # terminate the test early.
228             raise GotTooFarError("we shouldn't be allowed to get this far")
229         return defer.succeed(["\x00" * length])
230     def close(self):
231         pass
232
233 DATA = """
234 Once upon a time, there was a beautiful princess named Buttercup. She lived
235 in a magical land where every file was stored securely among millions of
236 machines, and nobody ever worried about their data being lost ever again.
237 The End.
238 """
239 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
240
241 SIZE_ZERO = 0
242 SIZE_SMALL = 16
243 SIZE_LARGE = len(DATA)
244
245 def upload_data(uploader, data):
246     u = upload.Data(data, convergence=None)
247     return uploader.upload(u)
248 def upload_filename(uploader, filename):
249     u = upload.FileName(filename, convergence=None)
250     return uploader.upload(u)
251 def upload_filehandle(uploader, fh):
252     u = upload.FileHandle(fh, convergence=None)
253     return uploader.upload(u)
254
255 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
256     def setUp(self):
257         self.node = FakeClient(mode="good")
258         self.u = upload.Uploader()
259         self.u.running = True
260         self.u.parent = self.node
261
262     def _check_small(self, newuri, size):
263         u = uri.from_string(newuri)
264         self.failUnless(isinstance(u, uri.LiteralFileURI))
265         self.failUnlessEqual(len(u.data), size)
266
267     def _check_large(self, newuri, size):
268         u = uri.from_string(newuri)
269         self.failUnless(isinstance(u, uri.CHKFileURI))
270         self.failUnless(isinstance(u.get_storage_index(), str))
271         self.failUnlessEqual(len(u.get_storage_index()), 16)
272         self.failUnless(isinstance(u.key, str))
273         self.failUnlessEqual(len(u.key), 16)
274         self.failUnlessEqual(u.size, size)
275
276     def get_data(self, size):
277         return DATA[:size]
278
279     def test_too_large(self):
280         # we've removed the 4GiB share size limit (see ticket #346 for
281         # details), but still have an 8-byte field, so the limit is now
282         # 2**64, so make sure we reject files larger than that.
283         k = 3; happy = 7; n = 10
284         self.set_encoding_parameters(k, happy, n)
285         big = k*(2**64)
286         data1 = GiganticUploadable(big)
287         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
288                             "This file is too large to be uploaded (data_size)",
289                             self.u.upload, data1)
290         data2 = GiganticUploadable(big-3)
291         d.addCallback(lambda res:
292                       self.shouldFail(FileTooLargeError,
293                                       "test_too_large-data2",
294                                       "This file is too large to be uploaded (offsets)",
295                                       self.u.upload, data2))
296         # I don't know where the actual limit is.. it depends upon how large
297         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
298         return d
299
300     def test_data_zero(self):
301         data = self.get_data(SIZE_ZERO)
302         d = upload_data(self.u, data)
303         d.addCallback(extract_uri)
304         d.addCallback(self._check_small, SIZE_ZERO)
305         return d
306
307     def test_data_small(self):
308         data = self.get_data(SIZE_SMALL)
309         d = upload_data(self.u, data)
310         d.addCallback(extract_uri)
311         d.addCallback(self._check_small, SIZE_SMALL)
312         return d
313
314     def test_data_large(self):
315         data = self.get_data(SIZE_LARGE)
316         d = upload_data(self.u, data)
317         d.addCallback(extract_uri)
318         d.addCallback(self._check_large, SIZE_LARGE)
319         return d
320
321     def test_data_large_odd_segments(self):
322         data = self.get_data(SIZE_LARGE)
323         segsize = int(SIZE_LARGE / 2.5)
324         # we want 3 segments, since that's not a power of two
325         self.set_encoding_parameters(25, 25, 100, segsize)
326         d = upload_data(self.u, data)
327         d.addCallback(extract_uri)
328         d.addCallback(self._check_large, SIZE_LARGE)
329         return d
330
331     def test_filehandle_zero(self):
332         data = self.get_data(SIZE_ZERO)
333         d = upload_filehandle(self.u, StringIO(data))
334         d.addCallback(extract_uri)
335         d.addCallback(self._check_small, SIZE_ZERO)
336         return d
337
338     def test_filehandle_small(self):
339         data = self.get_data(SIZE_SMALL)
340         d = upload_filehandle(self.u, StringIO(data))
341         d.addCallback(extract_uri)
342         d.addCallback(self._check_small, SIZE_SMALL)
343         return d
344
345     def test_filehandle_large(self):
346         data = self.get_data(SIZE_LARGE)
347         d = upload_filehandle(self.u, StringIO(data))
348         d.addCallback(extract_uri)
349         d.addCallback(self._check_large, SIZE_LARGE)
350         return d
351
352     def test_filename_zero(self):
353         fn = "Uploader-test_filename_zero.data"
354         f = open(fn, "wb")
355         data = self.get_data(SIZE_ZERO)
356         f.write(data)
357         f.close()
358         d = upload_filename(self.u, fn)
359         d.addCallback(extract_uri)
360         d.addCallback(self._check_small, SIZE_ZERO)
361         return d
362
363     def test_filename_small(self):
364         fn = "Uploader-test_filename_small.data"
365         f = open(fn, "wb")
366         data = self.get_data(SIZE_SMALL)
367         f.write(data)
368         f.close()
369         d = upload_filename(self.u, fn)
370         d.addCallback(extract_uri)
371         d.addCallback(self._check_small, SIZE_SMALL)
372         return d
373
374     def test_filename_large(self):
375         fn = "Uploader-test_filename_large.data"
376         f = open(fn, "wb")
377         data = self.get_data(SIZE_LARGE)
378         f.write(data)
379         f.close()
380         d = upload_filename(self.u, fn)
381         d.addCallback(extract_uri)
382         d.addCallback(self._check_large, SIZE_LARGE)
383         return d
384
385 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
386     def make_node(self, mode, num_servers=10):
387         self.node = FakeClient(mode, num_servers)
388         self.u = upload.Uploader()
389         self.u.running = True
390         self.u.parent = self.node
391
392     def _check_large(self, newuri, size):
393         u = uri.from_string(newuri)
394         self.failUnless(isinstance(u, uri.CHKFileURI))
395         self.failUnless(isinstance(u.get_storage_index(), str))
396         self.failUnlessEqual(len(u.get_storage_index()), 16)
397         self.failUnless(isinstance(u.key, str))
398         self.failUnlessEqual(len(u.key), 16)
399         self.failUnlessEqual(u.size, size)
400
401     def test_first_error(self):
402         mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
403         self.make_node(mode)
404         self.set_encoding_parameters(k=25, happy=1, n=50)
405         d = upload_data(self.u, DATA)
406         d.addCallback(extract_uri)
407         d.addCallback(self._check_large, SIZE_LARGE)
408         return d
409
410     def test_first_error_all(self):
411         self.make_node("first-fail")
412         d = self.shouldFail(UploadUnhappinessError, "first_error_all",
413                             "peer selection failed",
414                             upload_data, self.u, DATA)
415         def _check((f,)):
416             self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
417             # there should also be a 'last failure was' message
418             self.failUnlessIn("ServerError", str(f.value))
419         d.addCallback(_check)
420         return d
421
422     def test_second_error(self):
423         # we want to make sure we make it to a third pass. This means that
424         # the first pass was insufficient to place all shares, and at least
425         # one of second pass servers (other than the last one) accepted a
426         # share (so we'll believe that a third pass will be useful). (if
427         # everyone but the last server throws an error, then we'll send all
428         # the remaining shares to the last server at the end of the second
429         # pass, and if that succeeds, we won't make it to a third pass).
430         #
431         # we can achieve this 97.5% of the time by using 40 servers, having
432         # 39 of them fail on the second request, leaving only one to succeed
433         # on the second request. (we need to keep the number of servers low
434         # enough to ensure a second pass with 100 shares).
435         mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
436         self.make_node(mode, 40)
437         d = upload_data(self.u, DATA)
438         d.addCallback(extract_uri)
439         d.addCallback(self._check_large, SIZE_LARGE)
440         return d
441
442     def test_second_error_all(self):
443         self.make_node("second-fail")
444         d = self.shouldFail(UploadUnhappinessError, "second_error_all",
445                             "peer selection failed",
446                             upload_data, self.u, DATA)
447         def _check((f,)):
448             self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
449             # there should also be a 'last failure was' message
450             self.failUnlessIn("ServerError", str(f.value))
451         d.addCallback(_check)
452         return d
453
454 class FullServer(unittest.TestCase):
455     def setUp(self):
456         self.node = FakeClient(mode="full")
457         self.u = upload.Uploader()
458         self.u.running = True
459         self.u.parent = self.node
460
461     def _should_fail(self, f):
462         self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
463
464     def test_data_large(self):
465         data = DATA
466         d = upload_data(self.u, data)
467         d.addBoth(self._should_fail)
468         return d
469
470 class PeerSelection(unittest.TestCase):
471
472     def make_client(self, num_servers=50):
473         self.node = FakeClient(mode="good", num_servers=num_servers)
474         self.u = upload.Uploader()
475         self.u.running = True
476         self.u.parent = self.node
477
478     def get_data(self, size):
479         return DATA[:size]
480
481     def _check_large(self, newuri, size):
482         u = uri.from_string(newuri)
483         self.failUnless(isinstance(u, uri.CHKFileURI))
484         self.failUnless(isinstance(u.get_storage_index(), str))
485         self.failUnlessEqual(len(u.get_storage_index()), 16)
486         self.failUnless(isinstance(u.key, str))
487         self.failUnlessEqual(len(u.key), 16)
488         self.failUnlessEqual(u.size, size)
489
490     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
491         p = {"k": k,
492              "happy": happy,
493              "n": n,
494              "max_segment_size": max_segsize,
495              }
496         self.node.DEFAULT_ENCODING_PARAMETERS = p
497
498     def test_one_each(self):
499         # if we have 50 shares, and there are 50 peers, and they all accept a
500         # share, we should get exactly one share per peer
501
502         self.make_client()
503         data = self.get_data(SIZE_LARGE)
504         self.set_encoding_parameters(25, 30, 50)
505         d = upload_data(self.u, data)
506         d.addCallback(extract_uri)
507         d.addCallback(self._check_large, SIZE_LARGE)
508         def _check(res):
509             for p in self.node.last_peers:
510                 allocated = p.allocated
511                 self.failUnlessEqual(len(allocated), 1)
512                 self.failUnlessEqual(p.queries, 1)
513         d.addCallback(_check)
514         return d
515
516     def test_two_each(self):
517         # if we have 100 shares, and there are 50 peers, and they all accept
518         # all shares, we should get exactly two shares per peer
519
520         self.make_client()
521         data = self.get_data(SIZE_LARGE)
522         # if there are 50 peers, then happy needs to be <= 50
523         self.set_encoding_parameters(50, 50, 100)
524         d = upload_data(self.u, data)
525         d.addCallback(extract_uri)
526         d.addCallback(self._check_large, SIZE_LARGE)
527         def _check(res):
528             for p in self.node.last_peers:
529                 allocated = p.allocated
530                 self.failUnlessEqual(len(allocated), 2)
531                 self.failUnlessEqual(p.queries, 2)
532         d.addCallback(_check)
533         return d
534
535     def test_one_each_plus_one_extra(self):
536         # if we have 51 shares, and there are 50 peers, then one peer gets
537         # two shares and the rest get just one
538
539         self.make_client()
540         data = self.get_data(SIZE_LARGE)
541         self.set_encoding_parameters(24, 41, 51)
542         d = upload_data(self.u, data)
543         d.addCallback(extract_uri)
544         d.addCallback(self._check_large, SIZE_LARGE)
545         def _check(res):
546             got_one = []
547             got_two = []
548             for p in self.node.last_peers:
549                 allocated = p.allocated
550                 self.failUnless(len(allocated) in (1,2), len(allocated))
551                 if len(allocated) == 1:
552                     self.failUnlessEqual(p.queries, 1)
553                     got_one.append(p)
554                 else:
555                     self.failUnlessEqual(p.queries, 2)
556                     got_two.append(p)
557             self.failUnlessEqual(len(got_one), 49)
558             self.failUnlessEqual(len(got_two), 1)
559         d.addCallback(_check)
560         return d
561
562     def test_four_each(self):
563         # if we have 200 shares, and there are 50 peers, then each peer gets
564         # 4 shares. The design goal is to accomplish this with only two
565         # queries per peer.
566
567         self.make_client()
568         data = self.get_data(SIZE_LARGE)
569         # if there are 50 peers, then happy should be no more than 50 if
570         # we want this to work.
571         self.set_encoding_parameters(100, 50, 200)
572         d = upload_data(self.u, data)
573         d.addCallback(extract_uri)
574         d.addCallback(self._check_large, SIZE_LARGE)
575         def _check(res):
576             for p in self.node.last_peers:
577                 allocated = p.allocated
578                 self.failUnlessEqual(len(allocated), 4)
579                 self.failUnlessEqual(p.queries, 2)
580         d.addCallback(_check)
581         return d
582
583     def test_three_of_ten(self):
584         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
585         # 4+4+2
586
587         self.make_client(3)
588         data = self.get_data(SIZE_LARGE)
589         self.set_encoding_parameters(3, 3, 10)
590         d = upload_data(self.u, data)
591         d.addCallback(extract_uri)
592         d.addCallback(self._check_large, SIZE_LARGE)
593         def _check(res):
594             counts = {}
595             for p in self.node.last_peers:
596                 allocated = p.allocated
597                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
598             histogram = [counts.get(i, 0) for i in range(5)]
599             self.failUnlessEqual(histogram, [0,0,0,2,1])
600         d.addCallback(_check)
601         return d
602
603     def test_some_big_some_small(self):
604         # 10 shares, 20 servers, but half the servers don't support a
605         # share-size large enough for our file
606         mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
607         self.node = FakeClient(mode, num_servers=20)
608         self.u = upload.Uploader()
609         self.u.running = True
610         self.u.parent = self.node
611
612         data = self.get_data(SIZE_LARGE)
613         self.set_encoding_parameters(3, 5, 10)
614         d = upload_data(self.u, data)
615         d.addCallback(extract_uri)
616         d.addCallback(self._check_large, SIZE_LARGE)
617         def _check(res):
618             # we should have put one share each on the big peers, and zero
619             # shares on the small peers
620             total_allocated = 0
621             for p in self.node.last_peers:
622                 if p.mode == "good":
623                     self.failUnlessEqual(len(p.allocated), 1)
624                 elif p.mode == "small":
625                     self.failUnlessEqual(len(p.allocated), 0)
626                 total_allocated += len(p.allocated)
627             self.failUnlessEqual(total_allocated, 10)
628         d.addCallback(_check)
629         return d
630
631
632 class StorageIndex(unittest.TestCase):
633     def test_params_must_matter(self):
634         DATA = "I am some data"
635         u = upload.Data(DATA, convergence="")
636         eu = upload.EncryptAnUploadable(u)
637         d1 = eu.get_storage_index()
638
639         # CHK means the same data should encrypt the same way
640         u = upload.Data(DATA, convergence="")
641         eu = upload.EncryptAnUploadable(u)
642         d1a = eu.get_storage_index()
643
644         # but if we use a different convergence string it should be different
645         u = upload.Data(DATA, convergence="wheee!")
646         eu = upload.EncryptAnUploadable(u)
647         d1salt1 = eu.get_storage_index()
648
649         # and if we add yet a different convergence it should be different again
650         u = upload.Data(DATA, convergence="NOT wheee!")
651         eu = upload.EncryptAnUploadable(u)
652         d1salt2 = eu.get_storage_index()
653
654         # and if we use the first string again it should be the same as last time
655         u = upload.Data(DATA, convergence="wheee!")
656         eu = upload.EncryptAnUploadable(u)
657         d1salt1a = eu.get_storage_index()
658
659         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
660         u = upload.Data(DATA, convergence="")
661         u.encoding_param_k = u.default_encoding_param_k + 1
662         eu = upload.EncryptAnUploadable(u)
663         d2 = eu.get_storage_index()
664
665         # and if we use a random key, it should be different than the CHK
666         u = upload.Data(DATA, convergence=None)
667         eu = upload.EncryptAnUploadable(u)
668         d3 = eu.get_storage_index()
669         # and different from another instance
670         u = upload.Data(DATA, convergence=None)
671         eu = upload.EncryptAnUploadable(u)
672         d4 = eu.get_storage_index()
673
674         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
675         def _done(res):
676             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
677             self.failUnlessEqual(si1, si1a)
678             self.failIfEqual(si1, si2)
679             self.failIfEqual(si1, si3)
680             self.failIfEqual(si1, si4)
681             self.failIfEqual(si3, si4)
682             self.failIfEqual(si1salt1, si1)
683             self.failIfEqual(si1salt1, si1salt2)
684             self.failIfEqual(si1salt2, si1)
685             self.failUnlessEqual(si1salt1, si1salt1a)
686         d.addCallback(_done)
687         return d
688
689 # copied from python docs because itertools.combinations was added in
690 # python 2.6 and we support >= 2.4.
691 def combinations(iterable, r):
692     # combinations('ABCD', 2) --> AB AC AD BC BD CD
693     # combinations(range(4), 3) --> 012 013 023 123
694     pool = tuple(iterable)
695     n = len(pool)
696     if r > n:
697         return
698     indices = range(r)
699     yield tuple(pool[i] for i in indices)
700     while True:
701         for i in reversed(range(r)):
702             if indices[i] != i + n - r:
703                 break
704         else:
705             return
706         indices[i] += 1
707         for j in range(i+1, r):
708             indices[j] = indices[j-1] + 1
709         yield tuple(pool[i] for i in indices)
710
711 def is_happy_enough(servertoshnums, h, k):
712     """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """
713     if len(servertoshnums) < h:
714         return False
715     # print "servertoshnums: ", servertoshnums, h, k
716     for happysetcombo in combinations(servertoshnums.iterkeys(), h):
717         # print "happysetcombo: ", happysetcombo
718         for subsetcombo in combinations(happysetcombo, k):
719             shnums = reduce(set.union, [ servertoshnums[s] for s in subsetcombo ])
720             # print "subsetcombo: ", subsetcombo, ", shnums: ", shnums
721             if len(shnums) < k:
722                 # print "NOT HAAPP{Y", shnums, k
723                 return False
724     # print "HAAPP{Y"
725     return True
726
727 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
728     ShouldFailMixin):
729     def find_all_shares(self, unused=None):
730         """Locate shares on disk. Returns a dict that maps
731         server to set of sharenums.
732         """
733         assert self.g, "I tried to find a grid at self.g, but failed"
734         servertoshnums = {} # k: server, v: set(shnum)
735
736         for i, c in self.g.servers_by_number.iteritems():
737             for (dirp, dirns, fns) in os.walk(c.sharedir):
738                 for fn in fns:
739                     try:
740                         sharenum = int(fn)
741                     except TypeError:
742                         # Whoops, I guess that's not a share file then.
743                         pass
744                     else:
745                         servertoshnums.setdefault(i, set()).add(sharenum)
746
747         return servertoshnums
748
749     def _do_upload_with_broken_servers(self, servers_to_break):
750         """
751         I act like a normal upload, but before I send the results of
752         Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
753         PeerTrackers in the used_peers part of the return result.
754         """
755         assert self.g, "I tried to find a grid at self.g, but failed"
756         broker = self.g.clients[0].storage_broker
757         sh     = self.g.clients[0]._secret_holder
758         data = upload.Data("data" * 10000, convergence="")
759         data.encoding_param_k = 3
760         data.encoding_param_happy = 4
761         data.encoding_param_n = 10
762         uploadable = upload.EncryptAnUploadable(data)
763         encoder = encode.Encoder()
764         encoder.set_encrypted_uploadable(uploadable)
765         status = upload.UploadStatus()
766         selector = upload.Tahoe2PeerSelector("dglev", "test", status)
767         storage_index = encoder.get_param("storage_index")
768         share_size = encoder.get_param("share_size")
769         block_size = encoder.get_param("block_size")
770         num_segments = encoder.get_param("num_segments")
771         d = selector.get_shareholders(broker, sh, storage_index,
772                                       share_size, block_size, num_segments,
773                                       10, 3, 4)
774         def _have_shareholders((used_peers, already_peers)):
775             assert servers_to_break <= len(used_peers)
776             for index in xrange(servers_to_break):
777                 server = list(used_peers)[index]
778                 for share in server.buckets.keys():
779                     server.buckets[share].abort()
780             buckets = {}
781             servermap = already_peers.copy()
782             for peer in used_peers:
783                 buckets.update(peer.buckets)
784                 for bucket in peer.buckets:
785                     servermap.setdefault(bucket, set()).add(peer.peerid)
786             encoder.set_shareholders(buckets, servermap)
787             d = encoder.start()
788             return d
789         d.addCallback(_have_shareholders)
790         return d
791
792     def _has_happy_share_distribution(self):
793         servertoshnums = self.find_all_shares()
794         k = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['k']
795         h = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['happy']
796         return is_happy_enough(servertoshnums, h, k)
797
798     def _add_server(self, server_number, readonly=False):
799         assert self.g, "I tried to find a grid at self.g, but failed"
800         ss = self.g.make_server(server_number, readonly)
801         self.g.add_server(server_number, ss)
802
803
804     def _add_server_with_share(self, server_number, share_number=None,
805                                readonly=False):
806         self._add_server(server_number, readonly)
807         if share_number is not None:
808             self._copy_share_to_server(share_number, server_number)
809
810
811     def _copy_share_to_server(self, share_number, server_number):
812         ss = self.g.servers_by_number[server_number]
813         # Copy share i from the directory associated with the first
814         # storage server to the directory associated with this one.
815         assert self.g, "I tried to find a grid at self.g, but failed"
816         assert self.shares, "I tried to find shares at self.shares, but failed"
817         old_share_location = self.shares[share_number][2]
818         new_share_location = os.path.join(ss.storedir, "shares")
819         si = uri.from_string(self.uri).get_storage_index()
820         new_share_location = os.path.join(new_share_location,
821                                           storage_index_to_dir(si))
822         if not os.path.exists(new_share_location):
823             os.makedirs(new_share_location)
824         new_share_location = os.path.join(new_share_location,
825                                           str(share_number))
826         if old_share_location != new_share_location:
827             shutil.copy(old_share_location, new_share_location)
828         shares = self.find_uri_shares(self.uri)
829         # Make sure that the storage server has the share.
830         self.failUnless((share_number, ss.my_nodeid, new_share_location)
831                         in shares)
832
833     def _setup_grid(self):
834         """
835         I set up a NoNetworkGrid with a single server and client.
836         """
837         self.set_up_grid(num_clients=1, num_servers=1)
838
839     def _setup_and_upload(self, **kwargs):
840         """
841         I set up a NoNetworkGrid with a single server and client,
842         upload a file to it, store its uri in self.uri, and store its
843         sharedata in self.shares.
844         """
845         self._setup_grid()
846         client = self.g.clients[0]
847         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
848         if "n" in kwargs and "k" in kwargs:
849             client.DEFAULT_ENCODING_PARAMETERS['k'] = kwargs['k']
850             client.DEFAULT_ENCODING_PARAMETERS['n'] = kwargs['n']
851         data = upload.Data("data" * 10000, convergence="")
852         self.data = data
853         d = client.upload(data)
854         def _store_uri(ur):
855             self.uri = ur.uri
856         d.addCallback(_store_uri)
857         d.addCallback(lambda ign:
858             self.find_uri_shares(self.uri))
859         def _store_shares(shares):
860             self.shares = shares
861         d.addCallback(_store_shares)
862         return d
863
864
865     def test_configure_parameters(self):
866         self.basedir = self.mktemp()
867         hooks = {0: self._set_up_nodes_extra_config}
868         self.set_up_grid(client_config_hooks=hooks)
869         c0 = self.g.clients[0]
870
871         DATA = "data" * 100
872         u = upload.Data(DATA, convergence="")
873         d = c0.upload(u)
874         d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
875         m = monitor.Monitor()
876         d.addCallback(lambda fn: fn.check(m))
877         def _check(cr):
878             data = cr.get_data()
879             self.failUnlessEqual(data["count-shares-needed"], 7)
880             self.failUnlessEqual(data["count-shares-expected"], 12)
881         d.addCallback(_check)
882         return d
883
884
885     def _setUp(self, ns):
886         # Used by test_happy_semantics and test_preexisting_share_behavior
887         # to set up the grid.
888         self.node = FakeClient(mode="good", num_servers=ns)
889         self.u = upload.Uploader()
890         self.u.running = True
891         self.u.parent = self.node
892
893
894     def test_happy_semantics(self):
895         self._setUp(2)
896         DATA = upload.Data("kittens" * 10000, convergence="")
897         # These parameters are unsatisfiable with only 2 servers.
898         self.set_encoding_parameters(k=3, happy=5, n=10)
899         d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
900                             "shares could be placed or found on only 2 "
901                             "server(s). We were asked to place shares on "
902                             "at least 5 server(s) such that any 3 of them "
903                             "have enough shares to recover the file",
904                             self.u.upload, DATA)
905         # Let's reset the client to have 10 servers
906         d.addCallback(lambda ign:
907             self._setUp(10))
908         # These parameters are satisfiable with 10 servers.
909         d.addCallback(lambda ign:
910             self.set_encoding_parameters(k=3, happy=5, n=10))
911         d.addCallback(lambda ign:
912             self.u.upload(DATA))
913         # Let's reset the client to have 7 servers
914         # (this is less than n, but more than h)
915         d.addCallback(lambda ign:
916             self._setUp(7))
917         # These parameters are satisfiable with 7 servers.
918         d.addCallback(lambda ign:
919             self.set_encoding_parameters(k=3, happy=5, n=10))
920         d.addCallback(lambda ign:
921             self.u.upload(DATA))
922         return d
923
924     def test_aborted_shares(self):
925         self.basedir = "upload/EncodingParameters/aborted_shares"
926         self.set_up_grid(num_servers=4)
927         c = self.g.clients[0]
928         DATA = upload.Data(100* "kittens", convergence="")
929         # These parameters are unsatisfiable with only 4 servers, but should
930         # work with 5, as long as the original 4 are not stuck in the open
931         # BucketWriter state (open() but not
932         parms = {"k":2, "happy":5, "n":5, "max_segment_size": 1*MiB}
933         c.DEFAULT_ENCODING_PARAMETERS = parms
934         d = self.shouldFail(UploadUnhappinessError, "test_aborted_shares",
935                             "shares could be placed on only 4 "
936                             "server(s) such that any 2 of them have enough "
937                             "shares to recover the file, but we were asked "
938                             "to place shares on at least 5 such servers",
939                             c.upload, DATA)
940         # now add the 5th server
941         d.addCallback(lambda ign: self._add_server(4, False))
942         # and this time the upload ought to succeed
943         d.addCallback(lambda ign: c.upload(DATA))
944         d.addCallback(lambda ign:
945             self.failUnless(self._has_happy_share_distribution()))
946         return d
947
948
949     def test_problem_layout_comment_52(self):
950         def _basedir():
951             self.basedir = self.mktemp()
952         _basedir()
953         # This scenario is at
954         # http://allmydata.org/trac/tahoe/ticket/778#comment:52
955         #
956         # The scenario in comment:52 proposes that we have a layout
957         # like:
958         # server 0: shares 1 - 9
959         # server 1: share 0, read-only
960         # server 2: share 0, read-only
961         # server 3: share 0, read-only
962         # To get access to the shares, we will first upload to one
963         # server, which will then have shares 0 - 9. We'll then
964         # add three new servers, configure them to not accept any new
965         # shares, then write share 0 directly into the serverdir of each,
966         # and then remove share 0 from server 0 in the same way.
967         # Then each of servers 1 - 3 will report that they have share 0,
968         # and will not accept any new share, while server 0 will report that
969         # it has shares 1 - 9 and will accept new shares.
970         # We'll then set 'happy' = 4, and see that an upload fails
971         # (as it should)
972         d = self._setup_and_upload()
973         d.addCallback(lambda ign:
974             self._add_server_with_share(server_number=1, share_number=0,
975                                         readonly=True))
976         d.addCallback(lambda ign:
977             self._add_server_with_share(server_number=2, share_number=0,
978                                         readonly=True))
979         d.addCallback(lambda ign:
980             self._add_server_with_share(server_number=3, share_number=0,
981                                         readonly=True))
982         # Remove the first share from server 0.
983         def _remove_share_0_from_server_0():
984             share_location = self.shares[0][2]
985             os.remove(share_location)
986         d.addCallback(lambda ign:
987             _remove_share_0_from_server_0())
988         # Set happy = 4 in the client.
989         def _prepare():
990             client = self.g.clients[0]
991             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
992             return client
993         d.addCallback(lambda ign:
994             _prepare())
995         # Uploading data should fail
996         d.addCallback(lambda client:
997             self.shouldFail(UploadUnhappinessError,
998                             "test_problem_layout_comment_52_test_1",
999                             "shares could be placed or found on 4 server(s), "
1000                             "but they are not spread out evenly enough to "
1001                             "ensure that any 3 of these servers would have "
1002                             "enough shares to recover the file. "
1003                             "We were asked to place shares on at "
1004                             "least 4 servers such that any 3 of them have "
1005                             "enough shares to recover the file",
1006                             client.upload, upload.Data("data" * 10000,
1007                                                        convergence="")))
1008
1009         # Do comment:52, but like this:
1010         # server 2: empty
1011         # server 3: share 0, read-only
1012         # server 1: share 0, read-only
1013         # server 0: shares 0-9
1014         d.addCallback(lambda ign:
1015             _basedir())
1016         d.addCallback(lambda ign:
1017             self._setup_and_upload())
1018         d.addCallback(lambda ign:
1019             self._add_server(server_number=2))
1020         d.addCallback(lambda ign:
1021             self._add_server_with_share(server_number=3, share_number=0,
1022                                         readonly=True))
1023         d.addCallback(lambda ign:
1024             self._add_server_with_share(server_number=1, share_number=0,
1025                                         readonly=True))
1026         def _prepare2():
1027             client = self.g.clients[0]
1028             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1029             return client
1030         d.addCallback(lambda ign:
1031             _prepare2())
1032         d.addCallback(lambda client:
1033             self.shouldFail(UploadUnhappinessError,
1034                             "test_problem_layout_comment_52_test_2",
1035                             "shares could be placed on only 3 server(s) such "
1036                             "that any 3 of them have enough shares to recover "
1037                             "the file, but we were asked to place shares on "
1038                             "at least 4 such servers.",
1039                             client.upload, upload.Data("data" * 10000,
1040                                                        convergence="")))
1041         return d
1042
1043
1044     def test_problem_layout_comment_53(self):
1045         # This scenario is at
1046         # http://allmydata.org/trac/tahoe/ticket/778#comment:53
1047         #
1048         # Set up the grid to have one server
1049         def _change_basedir(ign):
1050             self.basedir = self.mktemp()
1051         _change_basedir(None)
1052         # We start by uploading all of the shares to one server.
1053         # Next, we'll add three new servers to our NoNetworkGrid. We'll add
1054         # one share from our initial upload to each of these.
1055         # The counterintuitive ordering of the share numbers is to deal with
1056         # the permuting of these servers -- distributing the shares this
1057         # way ensures that the Tahoe2PeerSelector sees them in the order
1058         # described below.
1059         d = self._setup_and_upload()
1060         d.addCallback(lambda ign:
1061             self._add_server_with_share(server_number=1, share_number=2))
1062         d.addCallback(lambda ign:
1063             self._add_server_with_share(server_number=2, share_number=0))
1064         d.addCallback(lambda ign:
1065             self._add_server_with_share(server_number=3, share_number=1))
1066         # So, we now have the following layout:
1067         # server 0: shares 0 - 9
1068         # server 1: share 2
1069         # server 2: share 0
1070         # server 3: share 1
1071         # We change the 'happy' parameter in the client to 4.
1072         # The Tahoe2PeerSelector will see the peers permuted as:
1073         # 2, 3, 1, 0
1074         # Ideally, a reupload of our original data should work.
1075         def _reset_encoding_parameters(ign, happy=4):
1076             client = self.g.clients[0]
1077             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1078             return client
1079         d.addCallback(_reset_encoding_parameters)
1080         d.addCallback(lambda client:
1081             client.upload(upload.Data("data" * 10000, convergence="")))
1082         d.addCallback(lambda ign:
1083             self.failUnless(self._has_happy_share_distribution()))
1084
1085
1086         # This scenario is basically comment:53, but changed so that the
1087         # Tahoe2PeerSelector sees the server with all of the shares before
1088         # any of the other servers.
1089         # The layout is:
1090         # server 2: shares 0 - 9
1091         # server 3: share 0
1092         # server 1: share 1
1093         # server 4: share 2
1094         # The Tahoe2PeerSelector sees the peers permuted as:
1095         # 2, 3, 1, 4
1096         # Note that server 0 has been replaced by server 4; this makes it
1097         # easier to ensure that the last server seen by Tahoe2PeerSelector
1098         # has only one share.
1099         d.addCallback(_change_basedir)
1100         d.addCallback(lambda ign:
1101             self._setup_and_upload())
1102         d.addCallback(lambda ign:
1103             self._add_server_with_share(server_number=2, share_number=0))
1104         d.addCallback(lambda ign:
1105             self._add_server_with_share(server_number=3, share_number=1))
1106         d.addCallback(lambda ign:
1107             self._add_server_with_share(server_number=1, share_number=2))
1108         # Copy all of the other shares to server number 2
1109         def _copy_shares(ign):
1110             for i in xrange(0, 10):
1111                 self._copy_share_to_server(i, 2)
1112         d.addCallback(_copy_shares)
1113         # Remove the first server, and add a placeholder with share 0
1114         d.addCallback(lambda ign:
1115             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1116         d.addCallback(lambda ign:
1117             self._add_server_with_share(server_number=4, share_number=0))
1118         # Now try uploading.
1119         d.addCallback(_reset_encoding_parameters)
1120         d.addCallback(lambda client:
1121             client.upload(upload.Data("data" * 10000, convergence="")))
1122         d.addCallback(lambda ign:
1123             self.failUnless(self._has_happy_share_distribution()))
1124
1125
1126         # Try the same thing, but with empty servers after the first one
1127         # We want to make sure that Tahoe2PeerSelector will redistribute
1128         # shares as necessary, not simply discover an existing layout.
1129         # The layout is:
1130         # server 2: shares 0 - 9
1131         # server 3: empty
1132         # server 1: empty
1133         # server 4: empty
1134         d.addCallback(_change_basedir)
1135         d.addCallback(lambda ign:
1136             self._setup_and_upload())
1137         d.addCallback(lambda ign:
1138             self._add_server(server_number=2))
1139         d.addCallback(lambda ign:
1140             self._add_server(server_number=3))
1141         d.addCallback(lambda ign:
1142             self._add_server(server_number=1))
1143         d.addCallback(lambda ign:
1144             self._add_server(server_number=4))
1145         d.addCallback(_copy_shares)
1146         d.addCallback(lambda ign:
1147             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1148         d.addCallback(_reset_encoding_parameters)
1149         d.addCallback(lambda client:
1150             client.upload(upload.Data("data" * 10000, convergence="")))
1151         # Make sure that only as many shares as necessary to satisfy
1152         # servers of happiness were pushed.
1153         d.addCallback(lambda results:
1154             self.failUnlessEqual(results.pushed_shares, 3))
1155         d.addCallback(lambda ign:
1156             self.failUnless(self._has_happy_share_distribution()))
1157         return d
1158
1159     def test_problem_layout_ticket1124(self):
1160         self.basedir = self.mktemp()
1161         d = self._setup_and_upload(k=2, n=4)
1162
1163         # server 0: shares 0, 1, 2, 3
1164         # server 1: shares 0, 3
1165         # server 2: share 1
1166         # server 3: share 2
1167         # With this layout, an upload should just be satisfied that the current distribution is good enough, right?
1168         def _setup(ign):
1169             self._add_server_with_share(server_number=0, share_number=None)
1170             self._add_server_with_share(server_number=1, share_number=0)
1171             self._add_server_with_share(server_number=2, share_number=1)
1172             self._add_server_with_share(server_number=3, share_number=2)
1173             # Copy shares
1174             self._copy_share_to_server(3, 1)
1175             client = self.g.clients[0]
1176             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1177             return client
1178
1179         d.addCallback(_setup)
1180         d.addCallback(lambda client:
1181             client.upload(upload.Data("data" * 10000, convergence="")))
1182         d.addCallback(lambda ign:
1183             self.failUnless(self._has_happy_share_distribution()))
1184         return d
1185     test_problem_layout_ticket1124.todo = "Fix this after 1.7.1 release."
1186
1187     def test_happiness_with_some_readonly_peers(self):
1188         # Try the following layout
1189         # server 2: shares 0-9
1190         # server 4: share 0, read-only
1191         # server 3: share 1, read-only
1192         # server 1: share 2, read-only
1193         self.basedir = self.mktemp()
1194         d = self._setup_and_upload()
1195         d.addCallback(lambda ign:
1196             self._add_server_with_share(server_number=2, share_number=0))
1197         d.addCallback(lambda ign:
1198             self._add_server_with_share(server_number=3, share_number=1,
1199                                         readonly=True))
1200         d.addCallback(lambda ign:
1201             self._add_server_with_share(server_number=1, share_number=2,
1202                                         readonly=True))
1203         # Copy all of the other shares to server number 2
1204         def _copy_shares(ign):
1205             for i in xrange(1, 10):
1206                 self._copy_share_to_server(i, 2)
1207         d.addCallback(_copy_shares)
1208         # Remove server 0, and add another in its place
1209         d.addCallback(lambda ign:
1210             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1211         d.addCallback(lambda ign:
1212             self._add_server_with_share(server_number=4, share_number=0,
1213                                         readonly=True))
1214         def _reset_encoding_parameters(ign, happy=4):
1215             client = self.g.clients[0]
1216             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1217             return client
1218         d.addCallback(_reset_encoding_parameters)
1219         d.addCallback(lambda client:
1220             client.upload(upload.Data("data" * 10000, convergence="")))
1221         d.addCallback(lambda ign:
1222             self.failUnless(self._has_happy_share_distribution()))
1223         return d
1224
1225
1226     def test_happiness_with_all_readonly_peers(self):
1227         # server 3: share 1, read-only
1228         # server 1: share 2, read-only
1229         # server 2: shares 0-9, read-only
1230         # server 4: share 0, read-only
1231         # The idea with this test is to make sure that the survey of
1232         # read-only peers doesn't undercount servers of happiness
1233         self.basedir = self.mktemp()
1234         d = self._setup_and_upload()
1235         d.addCallback(lambda ign:
1236             self._add_server_with_share(server_number=4, share_number=0,
1237                                         readonly=True))
1238         d.addCallback(lambda ign:
1239             self._add_server_with_share(server_number=3, share_number=1,
1240                                         readonly=True))
1241         d.addCallback(lambda ign:
1242             self._add_server_with_share(server_number=1, share_number=2,
1243                                         readonly=True))
1244         d.addCallback(lambda ign:
1245             self._add_server_with_share(server_number=2, share_number=0,
1246                                         readonly=True))
1247         def _copy_shares(ign):
1248             for i in xrange(1, 10):
1249                 self._copy_share_to_server(i, 2)
1250         d.addCallback(_copy_shares)
1251         d.addCallback(lambda ign:
1252             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1253         def _reset_encoding_parameters(ign, happy=4):
1254             client = self.g.clients[0]
1255             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1256             return client
1257         d.addCallback(_reset_encoding_parameters)
1258         d.addCallback(lambda client:
1259             client.upload(upload.Data("data" * 10000, convergence="")))
1260         d.addCallback(lambda ign:
1261             self.failUnless(self._has_happy_share_distribution()))
1262         return d
1263
1264
1265     def test_dropped_servers_in_encoder(self):
1266         # The Encoder does its own "servers_of_happiness" check if it
1267         # happens to lose a bucket during an upload (it assumes that
1268         # the layout presented to it satisfies "servers_of_happiness"
1269         # until a failure occurs)
1270         #
1271         # This test simulates an upload where servers break after peer
1272         # selection, but before they are written to.
1273         def _set_basedir(ign=None):
1274             self.basedir = self.mktemp()
1275         _set_basedir()
1276         d = self._setup_and_upload();
1277         # Add 5 servers
1278         def _do_server_setup(ign):
1279             self._add_server(server_number=1)
1280             self._add_server(server_number=2)
1281             self._add_server(server_number=3)
1282             self._add_server(server_number=4)
1283             self._add_server(server_number=5)
1284         d.addCallback(_do_server_setup)
1285         # remove the original server
1286         # (necessary to ensure that the Tahoe2PeerSelector will distribute
1287         #  all the shares)
1288         def _remove_server(ign):
1289             server = self.g.servers_by_number[0]
1290             self.g.remove_server(server.my_nodeid)
1291         d.addCallback(_remove_server)
1292         # This should succeed; we still have 4 servers, and the
1293         # happiness of the upload is 4.
1294         d.addCallback(lambda ign:
1295             self._do_upload_with_broken_servers(1))
1296         # Now, do the same thing over again, but drop 2 servers instead
1297         # of 1. This should fail, because servers_of_happiness is 4 and
1298         # we can't satisfy that.
1299         d.addCallback(_set_basedir)
1300         d.addCallback(lambda ign:
1301             self._setup_and_upload())
1302         d.addCallback(_do_server_setup)
1303         d.addCallback(_remove_server)
1304         d.addCallback(lambda ign:
1305             self.shouldFail(UploadUnhappinessError,
1306                             "test_dropped_servers_in_encoder",
1307                             "shares could be placed on only 3 server(s) "
1308                             "such that any 3 of them have enough shares to "
1309                             "recover the file, but we were asked to place "
1310                             "shares on at least 4",
1311                             self._do_upload_with_broken_servers, 2))
1312         # Now do the same thing over again, but make some of the servers
1313         # readonly, break some of the ones that aren't, and make sure that
1314         # happiness accounting is preserved.
1315         d.addCallback(_set_basedir)
1316         d.addCallback(lambda ign:
1317             self._setup_and_upload())
1318         def _do_server_setup_2(ign):
1319             self._add_server(1)
1320             self._add_server(2)
1321             self._add_server(3)
1322             self._add_server_with_share(4, 7, readonly=True)
1323             self._add_server_with_share(5, 8, readonly=True)
1324         d.addCallback(_do_server_setup_2)
1325         d.addCallback(_remove_server)
1326         d.addCallback(lambda ign:
1327             self._do_upload_with_broken_servers(1))
1328         d.addCallback(_set_basedir)
1329         d.addCallback(lambda ign:
1330             self._setup_and_upload())
1331         d.addCallback(_do_server_setup_2)
1332         d.addCallback(_remove_server)
1333         d.addCallback(lambda ign:
1334             self.shouldFail(UploadUnhappinessError,
1335                             "test_dropped_servers_in_encoder",
1336                             "shares could be placed on only 3 server(s) "
1337                             "such that any 3 of them have enough shares to "
1338                             "recover the file, but we were asked to place "
1339                             "shares on at least 4",
1340                             self._do_upload_with_broken_servers, 2))
1341         return d
1342
1343
1344     def test_merge_peers(self):
1345         # merge_peers merges a list of used_peers and a dict of
1346         # shareid -> peerid mappings.
1347         shares = {
1348                     1 : set(["server1"]),
1349                     2 : set(["server2"]),
1350                     3 : set(["server3"]),
1351                     4 : set(["server4", "server5"]),
1352                     5 : set(["server1", "server2"]),
1353                  }
1354         # if not provided with a used_peers argument, it should just
1355         # return the first argument unchanged.
1356         self.failUnlessEqual(shares, merge_peers(shares, set([])))
1357         class FakePeerTracker:
1358             pass
1359         trackers = []
1360         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1361             t = FakePeerTracker()
1362             t.peerid = server
1363             t.buckets = [i]
1364             trackers.append(t)
1365         expected = {
1366                     1 : set(["server1"]),
1367                     2 : set(["server2"]),
1368                     3 : set(["server3"]),
1369                     4 : set(["server4", "server5"]),
1370                     5 : set(["server1", "server2", "server5"]),
1371                     6 : set(["server6"]),
1372                     7 : set(["server7"]),
1373                     8 : set(["server8"]),
1374                    }
1375         self.failUnlessEqual(expected, merge_peers(shares, set(trackers)))
1376         shares2 = {}
1377         expected = {
1378                     5 : set(["server5"]),
1379                     6 : set(["server6"]),
1380                     7 : set(["server7"]),
1381                     8 : set(["server8"]),
1382                    }
1383         self.failUnlessEqual(expected, merge_peers(shares2, set(trackers)))
1384         shares3 = {}
1385         trackers = []
1386         expected = {}
1387         for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1388             shares3[i] = set([server])
1389             t = FakePeerTracker()
1390             t.peerid = server
1391             t.buckets = [i]
1392             trackers.append(t)
1393             expected[i] = set([server])
1394         self.failUnlessEqual(expected, merge_peers(shares3, set(trackers)))
1395
1396
1397     def test_servers_of_happiness_utility_function(self):
1398         # These tests are concerned with the servers_of_happiness()
1399         # utility function, and its underlying matching algorithm. Other
1400         # aspects of the servers_of_happiness behavior are tested
1401         # elsehwere These tests exist to ensure that
1402         # servers_of_happiness doesn't under or overcount the happiness
1403         # value for given inputs.
1404
1405         # servers_of_happiness expects a dict of
1406         # shnum => set(peerids) as a preexisting shares argument.
1407         test1 = {
1408                  1 : set(["server1"]),
1409                  2 : set(["server2"]),
1410                  3 : set(["server3"]),
1411                  4 : set(["server4"])
1412                 }
1413         happy = servers_of_happiness(test1)
1414         self.failUnlessEqual(4, happy)
1415         test1[4] = set(["server1"])
1416         # We've added a duplicate server, so now servers_of_happiness
1417         # should be 3 instead of 4.
1418         happy = servers_of_happiness(test1)
1419         self.failUnlessEqual(3, happy)
1420         # The second argument of merge_peers should be a set of
1421         # objects with peerid and buckets as attributes. In actual use,
1422         # these will be PeerTracker instances, but for testing it is fine
1423         # to make a FakePeerTracker whose job is to hold those instance
1424         # variables to test that part.
1425         class FakePeerTracker:
1426             pass
1427         trackers = []
1428         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1429             t = FakePeerTracker()
1430             t.peerid = server
1431             t.buckets = [i]
1432             trackers.append(t)
1433         # Recall that test1 is a server layout with servers_of_happiness
1434         # = 3.  Since there isn't any overlap between the shnum ->
1435         # set([peerid]) correspondences in test1 and those in trackers,
1436         # the result here should be 7.
1437         test2 = merge_peers(test1, set(trackers))
1438         happy = servers_of_happiness(test2)
1439         self.failUnlessEqual(7, happy)
1440         # Now add an overlapping server to trackers. This is redundant,
1441         # so it should not cause the previously reported happiness value
1442         # to change.
1443         t = FakePeerTracker()
1444         t.peerid = "server1"
1445         t.buckets = [1]
1446         trackers.append(t)
1447         test2 = merge_peers(test1, set(trackers))
1448         happy = servers_of_happiness(test2)
1449         self.failUnlessEqual(7, happy)
1450         test = {}
1451         happy = servers_of_happiness(test)
1452         self.failUnlessEqual(0, happy)
1453         # Test a more substantial overlap between the trackers and the
1454         # existing assignments.
1455         test = {
1456             1 : set(['server1']),
1457             2 : set(['server2']),
1458             3 : set(['server3']),
1459             4 : set(['server4']),
1460         }
1461         trackers = []
1462         t = FakePeerTracker()
1463         t.peerid = 'server5'
1464         t.buckets = [4]
1465         trackers.append(t)
1466         t = FakePeerTracker()
1467         t.peerid = 'server6'
1468         t.buckets = [3, 5]
1469         trackers.append(t)
1470         # The value returned by servers_of_happiness is the size
1471         # of a maximum matching in the bipartite graph that
1472         # servers_of_happiness() makes between peerids and share
1473         # numbers. It should find something like this:
1474         # (server 1, share 1)
1475         # (server 2, share 2)
1476         # (server 3, share 3)
1477         # (server 5, share 4)
1478         # (server 6, share 5)
1479         #
1480         # and, since there are 5 edges in this matching, it should
1481         # return 5.
1482         test2 = merge_peers(test, set(trackers))
1483         happy = servers_of_happiness(test2)
1484         self.failUnlessEqual(5, happy)
1485         # Zooko's first puzzle:
1486         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1487         #
1488         # server 1: shares 0, 1
1489         # server 2: shares 1, 2
1490         # server 3: share 2
1491         #
1492         # This should yield happiness of 3.
1493         test = {
1494             0 : set(['server1']),
1495             1 : set(['server1', 'server2']),
1496             2 : set(['server2', 'server3']),
1497         }
1498         self.failUnlessEqual(3, servers_of_happiness(test))
1499         # Zooko's second puzzle:
1500         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1501         #
1502         # server 1: shares 0, 1
1503         # server 2: share 1
1504         #
1505         # This should yield happiness of 2.
1506         test = {
1507             0 : set(['server1']),
1508             1 : set(['server1', 'server2']),
1509         }
1510         self.failUnlessEqual(2, servers_of_happiness(test))
1511
1512
1513     def test_shares_by_server(self):
1514         test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1515         sbs = shares_by_server(test)
1516         self.failUnlessEqual(set([1]), sbs["server1"])
1517         self.failUnlessEqual(set([2]), sbs["server2"])
1518         self.failUnlessEqual(set([3]), sbs["server3"])
1519         self.failUnlessEqual(set([4]), sbs["server4"])
1520         test1 = {
1521                     1 : set(["server1"]),
1522                     2 : set(["server1"]),
1523                     3 : set(["server1"]),
1524                     4 : set(["server2"]),
1525                     5 : set(["server2"])
1526                 }
1527         sbs = shares_by_server(test1)
1528         self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1529         self.failUnlessEqual(set([4, 5]), sbs["server2"])
1530         # This should fail unless the peerid part of the mapping is a set
1531         test2 = {1: "server1"}
1532         self.shouldFail(AssertionError,
1533                        "test_shares_by_server",
1534                        "",
1535                        shares_by_server, test2)
1536
1537
1538     def test_existing_share_detection(self):
1539         self.basedir = self.mktemp()
1540         d = self._setup_and_upload()
1541         # Our final setup should look like this:
1542         # server 1: shares 0 - 9, read-only
1543         # server 2: empty
1544         # server 3: empty
1545         # server 4: empty
1546         # The purpose of this test is to make sure that the peer selector
1547         # knows about the shares on server 1, even though it is read-only.
1548         # It used to simply filter these out, which would cause the test
1549         # to fail when servers_of_happiness = 4.
1550         d.addCallback(lambda ign:
1551             self._add_server_with_share(1, 0, True))
1552         d.addCallback(lambda ign:
1553             self._add_server(2))
1554         d.addCallback(lambda ign:
1555             self._add_server(3))
1556         d.addCallback(lambda ign:
1557             self._add_server(4))
1558         def _copy_shares(ign):
1559             for i in xrange(1, 10):
1560                 self._copy_share_to_server(i, 1)
1561         d.addCallback(_copy_shares)
1562         d.addCallback(lambda ign:
1563             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1564         def _prepare_client(ign):
1565             client = self.g.clients[0]
1566             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1567             return client
1568         d.addCallback(_prepare_client)
1569         d.addCallback(lambda client:
1570             client.upload(upload.Data("data" * 10000, convergence="")))
1571         d.addCallback(lambda ign:
1572             self.failUnless(self._has_happy_share_distribution()))
1573         return d
1574
1575
1576     def test_query_counting(self):
1577         # If peer selection fails, Tahoe2PeerSelector prints out a lot
1578         # of helpful diagnostic information, including query stats.
1579         # This test helps make sure that that information is accurate.
1580         self.basedir = self.mktemp()
1581         d = self._setup_and_upload()
1582         def _setup(ign):
1583             for i in xrange(1, 11):
1584                 self._add_server(server_number=i)
1585             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1586             c = self.g.clients[0]
1587             # We set happy to an unsatisfiable value so that we can check the
1588             # counting in the exception message. The same progress message
1589             # is also used when the upload is successful, but in that case it
1590             # only gets written to a log, so we can't see what it says.
1591             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1592             return c
1593         d.addCallback(_setup)
1594         d.addCallback(lambda c:
1595             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1596                             "10 queries placed some shares",
1597                             c.upload, upload.Data("data" * 10000,
1598                                                   convergence="")))
1599         # Now try with some readonly servers. We want to make sure that
1600         # the readonly peer share discovery phase is counted correctly.
1601         def _reset(ign):
1602             self.basedir = self.mktemp()
1603             self.g = None
1604         d.addCallback(_reset)
1605         d.addCallback(lambda ign:
1606             self._setup_and_upload())
1607         def _then(ign):
1608             for i in xrange(1, 11):
1609                 self._add_server(server_number=i)
1610             self._add_server(server_number=11, readonly=True)
1611             self._add_server(server_number=12, readonly=True)
1612             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1613             c = self.g.clients[0]
1614             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1615             return c
1616         d.addCallback(_then)
1617         d.addCallback(lambda c:
1618             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1619                             "2 placed none (of which 2 placed none due to "
1620                             "the server being full",
1621                             c.upload, upload.Data("data" * 10000,
1622                                                   convergence="")))
1623         # Now try the case where the upload process finds a bunch of the
1624         # shares that it wants to place on the first server, including
1625         # the one that it wanted to allocate there. Though no shares will
1626         # be allocated in this request, it should still be called
1627         # productive, since it caused some homeless shares to be
1628         # removed.
1629         d.addCallback(_reset)
1630         d.addCallback(lambda ign:
1631             self._setup_and_upload())
1632
1633         def _next(ign):
1634             for i in xrange(1, 11):
1635                 self._add_server(server_number=i)
1636             # Copy all of the shares to server 9, since that will be
1637             # the first one that the selector sees.
1638             for i in xrange(10):
1639                 self._copy_share_to_server(i, 9)
1640             # Remove server 0, and its contents
1641             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1642             # Make happiness unsatisfiable
1643             c = self.g.clients[0]
1644             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1645             return c
1646         d.addCallback(_next)
1647         d.addCallback(lambda c:
1648             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1649                             "1 queries placed some shares",
1650                             c.upload, upload.Data("data" * 10000,
1651                                                   convergence="")))
1652         return d
1653
1654
1655     def test_upper_limit_on_readonly_queries(self):
1656         self.basedir = self.mktemp()
1657         d = self._setup_and_upload()
1658         def _then(ign):
1659             for i in xrange(1, 11):
1660                 self._add_server(server_number=i, readonly=True)
1661             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1662             c = self.g.clients[0]
1663             c.DEFAULT_ENCODING_PARAMETERS['k'] = 2
1664             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1665             c.DEFAULT_ENCODING_PARAMETERS['n'] = 4
1666             return c
1667         d.addCallback(_then)
1668         d.addCallback(lambda client:
1669             self.shouldFail(UploadUnhappinessError,
1670                             "test_upper_limit_on_readonly_queries",
1671                             "sent 8 queries to 8 peers",
1672                             client.upload,
1673                             upload.Data('data' * 10000, convergence="")))
1674         return d
1675
1676
1677     def test_exception_messages_during_peer_selection(self):
1678         # server 1: read-only, no shares
1679         # server 2: read-only, no shares
1680         # server 3: read-only, no shares
1681         # server 4: read-only, no shares
1682         # server 5: read-only, no shares
1683         # This will fail, but we want to make sure that the log messages
1684         # are informative about why it has failed.
1685         self.basedir = self.mktemp()
1686         d = self._setup_and_upload()
1687         d.addCallback(lambda ign:
1688             self._add_server(server_number=1, readonly=True))
1689         d.addCallback(lambda ign:
1690             self._add_server(server_number=2, readonly=True))
1691         d.addCallback(lambda ign:
1692             self._add_server(server_number=3, readonly=True))
1693         d.addCallback(lambda ign:
1694             self._add_server(server_number=4, readonly=True))
1695         d.addCallback(lambda ign:
1696             self._add_server(server_number=5, readonly=True))
1697         d.addCallback(lambda ign:
1698             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1699         def _reset_encoding_parameters(ign, happy=4):
1700             client = self.g.clients[0]
1701             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1702             return client
1703         d.addCallback(_reset_encoding_parameters)
1704         d.addCallback(lambda client:
1705             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1706                             "placed 0 shares out of 10 "
1707                             "total (10 homeless), want to place shares on at "
1708                             "least 4 servers such that any 3 of them have "
1709                             "enough shares to recover the file, "
1710                             "sent 5 queries to 5 peers, 0 queries placed "
1711                             "some shares, 5 placed none "
1712                             "(of which 5 placed none due to the server being "
1713                             "full and 0 placed none due to an error)",
1714                             client.upload,
1715                             upload.Data("data" * 10000, convergence="")))
1716
1717
1718         # server 1: read-only, no shares
1719         # server 2: broken, no shares
1720         # server 3: read-only, no shares
1721         # server 4: read-only, no shares
1722         # server 5: read-only, no shares
1723         def _reset(ign):
1724             self.basedir = self.mktemp()
1725         d.addCallback(_reset)
1726         d.addCallback(lambda ign:
1727             self._setup_and_upload())
1728         d.addCallback(lambda ign:
1729             self._add_server(server_number=1, readonly=True))
1730         d.addCallback(lambda ign:
1731             self._add_server(server_number=2))
1732         def _break_server_2(ign):
1733             server = self.g.servers_by_number[2].my_nodeid
1734             # We have to break the server in servers_by_id,
1735             # because the one in servers_by_number isn't wrapped,
1736             # and doesn't look at its broken attribute when answering
1737             # queries.
1738             self.g.servers_by_id[server].broken = True
1739         d.addCallback(_break_server_2)
1740         d.addCallback(lambda ign:
1741             self._add_server(server_number=3, readonly=True))
1742         d.addCallback(lambda ign:
1743             self._add_server(server_number=4, readonly=True))
1744         d.addCallback(lambda ign:
1745             self._add_server(server_number=5, readonly=True))
1746         d.addCallback(lambda ign:
1747             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1748         d.addCallback(_reset_encoding_parameters)
1749         d.addCallback(lambda client:
1750             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1751                             "placed 0 shares out of 10 "
1752                             "total (10 homeless), want to place shares on at "
1753                             "least 4 servers such that any 3 of them have "
1754                             "enough shares to recover the file, "
1755                             "sent 5 queries to 5 peers, 0 queries placed "
1756                             "some shares, 5 placed none "
1757                             "(of which 4 placed none due to the server being "
1758                             "full and 1 placed none due to an error)",
1759                             client.upload,
1760                             upload.Data("data" * 10000, convergence="")))
1761         # server 0, server 1 = empty, accepting shares
1762         # This should place all of the shares, but still fail with happy=4.
1763         # We want to make sure that the exception message is worded correctly.
1764         d.addCallback(_reset)
1765         d.addCallback(lambda ign:
1766             self._setup_grid())
1767         d.addCallback(lambda ign:
1768             self._add_server(server_number=1))
1769         d.addCallback(_reset_encoding_parameters)
1770         d.addCallback(lambda client:
1771             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1772                             "shares could be placed or found on only 2 "
1773                             "server(s). We were asked to place shares on at "
1774                             "least 4 server(s) such that any 3 of them have "
1775                             "enough shares to recover the file.",
1776                             client.upload, upload.Data("data" * 10000,
1777                                                        convergence="")))
1778         # servers 0 - 4 = empty, accepting shares
1779         # This too should place all the shares, and this too should fail,
1780         # but since the effective happiness is more than the k encoding
1781         # parameter, it should trigger a different error message than the one
1782         # above.
1783         d.addCallback(_reset)
1784         d.addCallback(lambda ign:
1785             self._setup_grid())
1786         d.addCallback(lambda ign:
1787             self._add_server(server_number=1))
1788         d.addCallback(lambda ign:
1789             self._add_server(server_number=2))
1790         d.addCallback(lambda ign:
1791             self._add_server(server_number=3))
1792         d.addCallback(lambda ign:
1793             self._add_server(server_number=4))
1794         d.addCallback(_reset_encoding_parameters, happy=7)
1795         d.addCallback(lambda client:
1796             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1797                             "shares could be placed on only 5 server(s) such "
1798                             "that any 3 of them have enough shares to recover "
1799                             "the file, but we were asked to place shares on "
1800                             "at least 7 such servers.",
1801                             client.upload, upload.Data("data" * 10000,
1802                                                        convergence="")))
1803         # server 0: shares 0 - 9
1804         # server 1: share 0, read-only
1805         # server 2: share 0, read-only
1806         # server 3: share 0, read-only
1807         # This should place all of the shares, but fail with happy=4.
1808         # Since the number of servers with shares is more than the number
1809         # necessary to reconstitute the file, this will trigger a different
1810         # error message than either of those above.
1811         d.addCallback(_reset)
1812         d.addCallback(lambda ign:
1813             self._setup_and_upload())
1814         d.addCallback(lambda ign:
1815             self._add_server_with_share(server_number=1, share_number=0,
1816                                         readonly=True))
1817         d.addCallback(lambda ign:
1818             self._add_server_with_share(server_number=2, share_number=0,
1819                                         readonly=True))
1820         d.addCallback(lambda ign:
1821             self._add_server_with_share(server_number=3, share_number=0,
1822                                         readonly=True))
1823         d.addCallback(_reset_encoding_parameters, happy=7)
1824         d.addCallback(lambda client:
1825             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1826                             "shares could be placed or found on 4 server(s), "
1827                             "but they are not spread out evenly enough to "
1828                             "ensure that any 3 of these servers would have "
1829                             "enough shares to recover the file. We were asked "
1830                             "to place shares on at least 7 servers such that "
1831                             "any 3 of them have enough shares to recover the "
1832                             "file",
1833                             client.upload, upload.Data("data" * 10000,
1834                                                        convergence="")))
1835         return d
1836
1837
1838     def test_problem_layout_comment_187(self):
1839         # #778 comment 187 broke an initial attempt at a share
1840         # redistribution algorithm. This test is here to demonstrate the
1841         # breakage, and to test that subsequent algorithms don't also
1842         # break in the same way.
1843         self.basedir = self.mktemp()
1844         d = self._setup_and_upload(k=2, n=3)
1845
1846         # server 1: shares 0, 1, 2, readonly
1847         # server 2: share 0, readonly
1848         # server 3: share 0
1849         def _setup(ign):
1850             self._add_server_with_share(server_number=1, share_number=0,
1851                                         readonly=True)
1852             self._add_server_with_share(server_number=2, share_number=0,
1853                                         readonly=True)
1854             self._add_server_with_share(server_number=3, share_number=0)
1855             # Copy shares
1856             self._copy_share_to_server(1, 1)
1857             self._copy_share_to_server(2, 1)
1858             # Remove server 0
1859             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1860             client = self.g.clients[0]
1861             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
1862             return client
1863
1864         d.addCallback(_setup)
1865         d.addCallback(lambda client:
1866             client.upload(upload.Data("data" * 10000, convergence="")))
1867         d.addCallback(lambda ign:
1868             self.failUnless(self._has_happy_share_distribution()))
1869         return d
1870     test_problem_layout_comment_187.todo = "this isn't fixed yet"
1871
1872     def test_problem_layout_ticket_1118(self):
1873         # #1118 includes a report from a user who hit an assertion in
1874         # the upload code with this layout.
1875         self.basedir = self.mktemp()
1876         d = self._setup_and_upload(k=2, n=4)
1877
1878         # server 0: no shares
1879         # server 1: shares 0, 3
1880         # server 3: share 1
1881         # server 2: share 2
1882         # The order that they get queries is 0, 1, 3, 2
1883         def _setup(ign):
1884             self._add_server(server_number=0)
1885             self._add_server_with_share(server_number=1, share_number=0)
1886             self._add_server_with_share(server_number=2, share_number=2)
1887             self._add_server_with_share(server_number=3, share_number=1)
1888             # Copy shares
1889             self._copy_share_to_server(3, 1)
1890             storedir = self.get_serverdir(0)
1891             # remove the storedir, wiping out any existing shares
1892             shutil.rmtree(storedir)
1893             # create an empty storedir to replace the one we just removed
1894             os.mkdir(storedir)
1895             client = self.g.clients[0]
1896             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1897             return client
1898
1899         d.addCallback(_setup)
1900         d.addCallback(lambda client:
1901                           client.upload(upload.Data("data" * 10000, convergence="")))
1902         d.addCallback(lambda ign:
1903             self.failUnless(self._has_happy_share_distribution()))
1904         return d
1905
1906     def test_upload_succeeds_with_some_homeless_shares(self):
1907         # If the upload is forced to stop trying to place shares before
1908         # it has placed (or otherwise accounted) for all of them, but it
1909         # has placed enough to satisfy the upload health criteria that
1910         # we're using, it should still succeed.
1911         self.basedir = self.mktemp()
1912         d = self._setup_and_upload()
1913         def _server_setup(ign):
1914             # Add four servers so that we have a layout like this:
1915             # server 1: share 0, read-only
1916             # server 2: share 1, read-only
1917             # server 3: share 2, read-only
1918             # server 4: share 3, read-only
1919             # If we set happy = 4, the upload will manage to satisfy
1920             # servers of happiness, but not place all of the shares; we
1921             # want to test that the upload is declared successful in
1922             # this case.
1923             self._add_server_with_share(server_number=1, share_number=0,
1924                                         readonly=True)
1925             self._add_server_with_share(server_number=2, share_number=1,
1926                                         readonly=True)
1927             self._add_server_with_share(server_number=3, share_number=2,
1928                                         readonly=True)
1929             self._add_server_with_share(server_number=4, share_number=3,
1930                                         readonly=True)
1931             # Remove server 0.
1932             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1933             # Set the client appropriately
1934             c = self.g.clients[0]
1935             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1936             return c
1937         d.addCallback(_server_setup)
1938         d.addCallback(lambda client:
1939             client.upload(upload.Data("data" * 10000, convergence="")))
1940         d.addCallback(lambda ign:
1941             self.failUnless(self._has_happy_share_distribution()))
1942         return d
1943
1944
1945     def test_uploader_skips_over_servers_with_only_one_share(self):
1946         # We want to make sure that the redistribution logic ignores
1947         # servers with only one share, since placing these shares
1948         # elsewhere will at best keep happiness the same as it was, and
1949         # at worst hurt it.
1950         self.basedir = self.mktemp()
1951         d = self._setup_and_upload()
1952         def _server_setup(ign):
1953             # Add some servers so that the upload will need to
1954             # redistribute, but will first pass over a couple of servers
1955             # that don't have enough shares to redistribute before
1956             # finding one that does have shares to redistribute.
1957             self._add_server_with_share(server_number=1, share_number=0)
1958             self._add_server_with_share(server_number=2, share_number=2)
1959             self._add_server_with_share(server_number=3, share_number=1)
1960             self._add_server_with_share(server_number=8, share_number=4)
1961             self._add_server_with_share(server_number=5, share_number=5)
1962             self._add_server_with_share(server_number=10, share_number=7)
1963             for i in xrange(4):
1964                 self._copy_share_to_server(i, 2)
1965             return self.g.clients[0]
1966         d.addCallback(_server_setup)
1967         d.addCallback(lambda client:
1968             client.upload(upload.Data("data" * 10000, convergence="")))
1969         d.addCallback(lambda ign:
1970             self.failUnless(self._has_happy_share_distribution()))
1971         return d
1972
1973
1974     def test_peer_selector_bucket_abort(self):
1975         # If peer selection for an upload fails due to an unhappy
1976         # layout, the peer selection process should abort the buckets it
1977         # allocates before failing, so that the space can be re-used.
1978         self.basedir = self.mktemp()
1979         self.set_up_grid(num_servers=5)
1980
1981         # Try to upload a file with happy=7, which is unsatisfiable with
1982         # the current grid. This will fail, but should not take up any
1983         # space on the storage servers after it fails.
1984         client = self.g.clients[0]
1985         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
1986         d = defer.succeed(None)
1987         d.addCallback(lambda ignored:
1988             self.shouldFail(UploadUnhappinessError,
1989                             "test_peer_selection_bucket_abort",
1990                             "",
1991                             client.upload, upload.Data("data" * 10000,
1992                                                        convergence="")))
1993         # wait for the abort messages to get there.
1994         def _turn_barrier(res):
1995             return fireEventually(res)
1996         d.addCallback(_turn_barrier)
1997         def _then(ignored):
1998             for server in self.g.servers_by_number.values():
1999                 self.failUnlessEqual(server.allocated_size(), 0)
2000         d.addCallback(_then)
2001         return d
2002
2003
2004     def test_encoder_bucket_abort(self):
2005         # If enough servers die in the process of encoding and uploading
2006         # a file to make the layout unhappy, we should cancel the
2007         # newly-allocated buckets before dying.
2008         self.basedir = self.mktemp()
2009         self.set_up_grid(num_servers=4)
2010
2011         client = self.g.clients[0]
2012         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2013
2014         d = defer.succeed(None)
2015         d.addCallback(lambda ignored:
2016             self.shouldFail(UploadUnhappinessError,
2017                             "test_encoder_bucket_abort",
2018                             "",
2019                             self._do_upload_with_broken_servers, 1))
2020         def _turn_barrier(res):
2021             return fireEventually(res)
2022         d.addCallback(_turn_barrier)
2023         def _then(ignored):
2024             for server in self.g.servers_by_number.values():
2025                 self.failUnlessEqual(server.allocated_size(), 0)
2026         d.addCallback(_then)
2027         return d
2028
2029
2030     def _set_up_nodes_extra_config(self, clientdir):
2031         cfgfn = os.path.join(clientdir, "tahoe.cfg")
2032         oldcfg = open(cfgfn, "r").read()
2033         f = open(cfgfn, "wt")
2034         f.write(oldcfg)
2035         f.write("\n")
2036         f.write("[client]\n")
2037         f.write("shares.needed = 7\n")
2038         f.write("shares.total = 12\n")
2039         f.write("\n")
2040         f.close()
2041         return None
2042
2043 # TODO:
2044 #  upload with exactly 75 peers (shares_of_happiness)
2045 #  have a download fail
2046 #  cancel a download (need to implement more cancel stuff)