]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
refactor: s/peer/server/ in immutable/upload, happinessutil.py, test_upload
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1 # -*- coding: utf-8 -*-
2
3 import os, shutil
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.python.failure import Failure
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
9
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
14 from allmydata.util import log
15 from allmydata.util.assertutil import precondition
16 from allmydata.util.deferredutil import DeferredListShouldSucceed
17 from allmydata.test.no_network import GridTestMixin
18 from allmydata.test.common_util import ShouldFailMixin
19 from allmydata.util.happinessutil import servers_of_happiness, \
20                                          shares_by_server, merge_peers
21 from allmydata.storage_client import StorageFarmBroker
22 from allmydata.storage.server import storage_index_to_dir
23
24 MiB = 1024*1024
25
26 def extract_uri(results):
27     return results.uri
28
29 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
30 # have been due to an earlier test ERROR'ing out due to timeout, which seems
31 # to screw up subsequent tests.
32 timeout = 960
33
34 class Uploadable(unittest.TestCase):
35     def shouldEqual(self, data, expected):
36         self.failUnless(isinstance(data, list))
37         for e in data:
38             self.failUnless(isinstance(e, str))
39         s = "".join(data)
40         self.failUnlessEqual(s, expected)
41
42     def test_filehandle_random_key(self):
43         return self._test_filehandle(convergence=None)
44
45     def test_filehandle_convergent_encryption(self):
46         return self._test_filehandle(convergence="some convergence string")
47
48     def _test_filehandle(self, convergence):
49         s = StringIO("a"*41)
50         u = upload.FileHandle(s, convergence=convergence)
51         d = u.get_size()
52         d.addCallback(self.failUnlessEqual, 41)
53         d.addCallback(lambda res: u.read(1))
54         d.addCallback(self.shouldEqual, "a")
55         d.addCallback(lambda res: u.read(80))
56         d.addCallback(self.shouldEqual, "a"*40)
57         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
58         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
59         return d
60
61     def test_filename(self):
62         basedir = "upload/Uploadable/test_filename"
63         os.makedirs(basedir)
64         fn = os.path.join(basedir, "file")
65         f = open(fn, "w")
66         f.write("a"*41)
67         f.close()
68         u = upload.FileName(fn, convergence=None)
69         d = u.get_size()
70         d.addCallback(self.failUnlessEqual, 41)
71         d.addCallback(lambda res: u.read(1))
72         d.addCallback(self.shouldEqual, "a")
73         d.addCallback(lambda res: u.read(80))
74         d.addCallback(self.shouldEqual, "a"*40)
75         d.addCallback(lambda res: u.close())
76         return d
77
78     def test_data(self):
79         s = "a"*41
80         u = upload.Data(s, convergence=None)
81         d = u.get_size()
82         d.addCallback(self.failUnlessEqual, 41)
83         d.addCallback(lambda res: u.read(1))
84         d.addCallback(self.shouldEqual, "a")
85         d.addCallback(lambda res: u.read(80))
86         d.addCallback(self.shouldEqual, "a"*40)
87         d.addCallback(lambda res: u.close())
88         return d
89
90 class ServerError(Exception):
91     pass
92
93 class SetDEPMixin:
94     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
95         p = {"k": k,
96              "happy": happy,
97              "n": n,
98              "max_segment_size": max_segsize,
99              }
100         self.node.DEFAULT_ENCODING_PARAMETERS = p
101
102 class FakeStorageServer:
103     def __init__(self, mode):
104         self.mode = mode
105         self.allocated = []
106         self.queries = 0
107         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
108                          { "maximum-immutable-share-size": 2**32 },
109                          "application-version": str(allmydata.__full_version__),
110                          }
111         if mode == "small":
112             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
113                              { "maximum-immutable-share-size": 10 },
114                              "application-version": str(allmydata.__full_version__),
115                              }
116
117
118     def callRemote(self, methname, *args, **kwargs):
119         def _call():
120             meth = getattr(self, methname)
121             return meth(*args, **kwargs)
122         d = fireEventually()
123         d.addCallback(lambda res: _call())
124         return d
125
126     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
127                          sharenums, share_size, canary):
128         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
129         if self.mode == "first-fail":
130             if self.queries == 0:
131                 raise ServerError
132         if self.mode == "second-fail":
133             if self.queries == 1:
134                 raise ServerError
135         self.queries += 1
136         if self.mode == "full":
137             return (set(), {},)
138         elif self.mode == "already got them":
139             return (set(sharenums), {},)
140         else:
141             for shnum in sharenums:
142                 self.allocated.append( (storage_index, shnum) )
143             return (set(),
144                     dict([( shnum, FakeBucketWriter(share_size) )
145                           for shnum in sharenums]),
146                     )
147
148 class FakeBucketWriter:
149     # a diagnostic version of storageserver.BucketWriter
150     def __init__(self, size):
151         self.data = StringIO()
152         self.closed = False
153         self._size = size
154
155     def callRemote(self, methname, *args, **kwargs):
156         def _call():
157             meth = getattr(self, "remote_" + methname)
158             return meth(*args, **kwargs)
159         d = fireEventually()
160         d.addCallback(lambda res: _call())
161         return d
162
163
164     def callRemoteOnly(self, methname, *args, **kwargs):
165         d = self.callRemote(methname, *args, **kwargs)
166         del d # callRemoteOnly ignores this
167         return None
168
169
170     def remote_write(self, offset, data):
171         precondition(not self.closed)
172         precondition(offset >= 0)
173         precondition(offset+len(data) <= self._size,
174                      "offset=%d + data=%d > size=%d" %
175                      (offset, len(data), self._size))
176         self.data.seek(offset)
177         self.data.write(data)
178
179     def remote_close(self):
180         precondition(not self.closed)
181         self.closed = True
182
183     def remote_abort(self):
184         pass
185
186 class FakeClient:
187     DEFAULT_ENCODING_PARAMETERS = {"k":25,
188                                    "happy": 25,
189                                    "n": 100,
190                                    "max_segment_size": 1*MiB,
191                                    }
192     def __init__(self, mode="good", num_servers=50):
193         self.num_servers = num_servers
194         if type(mode) is str:
195             mode = dict([i,mode] for i in range(num_servers))
196         servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
197                     for fakeid in range(self.num_servers) ]
198         self.storage_broker = StorageFarmBroker(None, permute_peers=True)
199         for (serverid, rref) in servers:
200             self.storage_broker.test_add_rref(serverid, rref)
201         self.last_servers = [s[1] for s in servers]
202
203     def log(self, *args, **kwargs):
204         pass
205     def get_encoding_parameters(self):
206         return self.DEFAULT_ENCODING_PARAMETERS
207     def get_storage_broker(self):
208         return self.storage_broker
209     _secret_holder = client.SecretHolder("lease secret", "convergence secret")
210
211 class GotTooFarError(Exception):
212     pass
213
214 class GiganticUploadable(upload.FileHandle):
215     def __init__(self, size):
216         self._size = size
217         self._fp = 0
218
219     def get_encryption_key(self):
220         return defer.succeed("\x00" * 16)
221     def get_size(self):
222         return defer.succeed(self._size)
223     def read(self, length):
224         left = self._size - self._fp
225         length = min(left, length)
226         self._fp += length
227         if self._fp > 1000000:
228             # terminate the test early.
229             raise GotTooFarError("we shouldn't be allowed to get this far")
230         return defer.succeed(["\x00" * length])
231     def close(self):
232         pass
233
234 DATA = """
235 Once upon a time, there was a beautiful princess named Buttercup. She lived
236 in a magical land where every file was stored securely among millions of
237 machines, and nobody ever worried about their data being lost ever again.
238 The End.
239 """
240 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
241
242 SIZE_ZERO = 0
243 SIZE_SMALL = 16
244 SIZE_LARGE = len(DATA)
245
246 def upload_data(uploader, data):
247     u = upload.Data(data, convergence=None)
248     return uploader.upload(u)
249 def upload_filename(uploader, filename):
250     u = upload.FileName(filename, convergence=None)
251     return uploader.upload(u)
252 def upload_filehandle(uploader, fh):
253     u = upload.FileHandle(fh, convergence=None)
254     return uploader.upload(u)
255
256 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
257     def setUp(self):
258         self.node = FakeClient(mode="good")
259         self.u = upload.Uploader()
260         self.u.running = True
261         self.u.parent = self.node
262
263     def _check_small(self, newuri, size):
264         u = uri.from_string(newuri)
265         self.failUnless(isinstance(u, uri.LiteralFileURI))
266         self.failUnlessEqual(len(u.data), size)
267
268     def _check_large(self, newuri, size):
269         u = uri.from_string(newuri)
270         self.failUnless(isinstance(u, uri.CHKFileURI))
271         self.failUnless(isinstance(u.get_storage_index(), str))
272         self.failUnlessEqual(len(u.get_storage_index()), 16)
273         self.failUnless(isinstance(u.key, str))
274         self.failUnlessEqual(len(u.key), 16)
275         self.failUnlessEqual(u.size, size)
276
277     def get_data(self, size):
278         return DATA[:size]
279
280     def test_too_large(self):
281         # we've removed the 4GiB share size limit (see ticket #346 for
282         # details), but still have an 8-byte field, so the limit is now
283         # 2**64, so make sure we reject files larger than that.
284         k = 3; happy = 7; n = 10
285         self.set_encoding_parameters(k, happy, n)
286         big = k*(2**64)
287         data1 = GiganticUploadable(big)
288         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
289                             "This file is too large to be uploaded (data_size)",
290                             self.u.upload, data1)
291         data2 = GiganticUploadable(big-3)
292         d.addCallback(lambda res:
293                       self.shouldFail(FileTooLargeError,
294                                       "test_too_large-data2",
295                                       "This file is too large to be uploaded (offsets)",
296                                       self.u.upload, data2))
297         # I don't know where the actual limit is.. it depends upon how large
298         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
299         return d
300
301     def test_data_zero(self):
302         data = self.get_data(SIZE_ZERO)
303         d = upload_data(self.u, data)
304         d.addCallback(extract_uri)
305         d.addCallback(self._check_small, SIZE_ZERO)
306         return d
307
308     def test_data_small(self):
309         data = self.get_data(SIZE_SMALL)
310         d = upload_data(self.u, data)
311         d.addCallback(extract_uri)
312         d.addCallback(self._check_small, SIZE_SMALL)
313         return d
314
315     def test_data_large(self):
316         data = self.get_data(SIZE_LARGE)
317         d = upload_data(self.u, data)
318         d.addCallback(extract_uri)
319         d.addCallback(self._check_large, SIZE_LARGE)
320         return d
321
322     def test_data_large_odd_segments(self):
323         data = self.get_data(SIZE_LARGE)
324         segsize = int(SIZE_LARGE / 2.5)
325         # we want 3 segments, since that's not a power of two
326         self.set_encoding_parameters(25, 25, 100, segsize)
327         d = upload_data(self.u, data)
328         d.addCallback(extract_uri)
329         d.addCallback(self._check_large, SIZE_LARGE)
330         return d
331
332     def test_filehandle_zero(self):
333         data = self.get_data(SIZE_ZERO)
334         d = upload_filehandle(self.u, StringIO(data))
335         d.addCallback(extract_uri)
336         d.addCallback(self._check_small, SIZE_ZERO)
337         return d
338
339     def test_filehandle_small(self):
340         data = self.get_data(SIZE_SMALL)
341         d = upload_filehandle(self.u, StringIO(data))
342         d.addCallback(extract_uri)
343         d.addCallback(self._check_small, SIZE_SMALL)
344         return d
345
346     def test_filehandle_large(self):
347         data = self.get_data(SIZE_LARGE)
348         d = upload_filehandle(self.u, StringIO(data))
349         d.addCallback(extract_uri)
350         d.addCallback(self._check_large, SIZE_LARGE)
351         return d
352
353     def test_filename_zero(self):
354         fn = "Uploader-test_filename_zero.data"
355         f = open(fn, "wb")
356         data = self.get_data(SIZE_ZERO)
357         f.write(data)
358         f.close()
359         d = upload_filename(self.u, fn)
360         d.addCallback(extract_uri)
361         d.addCallback(self._check_small, SIZE_ZERO)
362         return d
363
364     def test_filename_small(self):
365         fn = "Uploader-test_filename_small.data"
366         f = open(fn, "wb")
367         data = self.get_data(SIZE_SMALL)
368         f.write(data)
369         f.close()
370         d = upload_filename(self.u, fn)
371         d.addCallback(extract_uri)
372         d.addCallback(self._check_small, SIZE_SMALL)
373         return d
374
375     def test_filename_large(self):
376         fn = "Uploader-test_filename_large.data"
377         f = open(fn, "wb")
378         data = self.get_data(SIZE_LARGE)
379         f.write(data)
380         f.close()
381         d = upload_filename(self.u, fn)
382         d.addCallback(extract_uri)
383         d.addCallback(self._check_large, SIZE_LARGE)
384         return d
385
386 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
387     def make_node(self, mode, num_servers=10):
388         self.node = FakeClient(mode, num_servers)
389         self.u = upload.Uploader()
390         self.u.running = True
391         self.u.parent = self.node
392
393     def _check_large(self, newuri, size):
394         u = uri.from_string(newuri)
395         self.failUnless(isinstance(u, uri.CHKFileURI))
396         self.failUnless(isinstance(u.get_storage_index(), str))
397         self.failUnlessEqual(len(u.get_storage_index()), 16)
398         self.failUnless(isinstance(u.key, str))
399         self.failUnlessEqual(len(u.key), 16)
400         self.failUnlessEqual(u.size, size)
401
402     def test_first_error(self):
403         mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
404         self.make_node(mode)
405         self.set_encoding_parameters(k=25, happy=1, n=50)
406         d = upload_data(self.u, DATA)
407         d.addCallback(extract_uri)
408         d.addCallback(self._check_large, SIZE_LARGE)
409         return d
410
411     def test_first_error_all(self):
412         self.make_node("first-fail")
413         d = self.shouldFail(UploadUnhappinessError, "first_error_all",
414                             "server selection failed",
415                             upload_data, self.u, DATA)
416         def _check((f,)):
417             self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
418             # there should also be a 'last failure was' message
419             self.failUnlessIn("ServerError", str(f.value))
420         d.addCallback(_check)
421         return d
422
423     def test_second_error(self):
424         # we want to make sure we make it to a third pass. This means that
425         # the first pass was insufficient to place all shares, and at least
426         # one of second pass servers (other than the last one) accepted a
427         # share (so we'll believe that a third pass will be useful). (if
428         # everyone but the last server throws an error, then we'll send all
429         # the remaining shares to the last server at the end of the second
430         # pass, and if that succeeds, we won't make it to a third pass).
431         #
432         # we can achieve this 97.5% of the time by using 40 servers, having
433         # 39 of them fail on the second request, leaving only one to succeed
434         # on the second request. (we need to keep the number of servers low
435         # enough to ensure a second pass with 100 shares).
436         mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
437         self.make_node(mode, 40)
438         d = upload_data(self.u, DATA)
439         d.addCallback(extract_uri)
440         d.addCallback(self._check_large, SIZE_LARGE)
441         return d
442
443     def test_second_error_all(self):
444         self.make_node("second-fail")
445         d = self.shouldFail(UploadUnhappinessError, "second_error_all",
446                             "server selection failed",
447                             upload_data, self.u, DATA)
448         def _check((f,)):
449             self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
450             # there should also be a 'last failure was' message
451             self.failUnlessIn("ServerError", str(f.value))
452         d.addCallback(_check)
453         return d
454
455 class FullServer(unittest.TestCase):
456     def setUp(self):
457         self.node = FakeClient(mode="full")
458         self.u = upload.Uploader()
459         self.u.running = True
460         self.u.parent = self.node
461
462     def _should_fail(self, f):
463         self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
464
465     def test_data_large(self):
466         data = DATA
467         d = upload_data(self.u, data)
468         d.addBoth(self._should_fail)
469         return d
470
471 class ServerSelection(unittest.TestCase):
472
473     def make_client(self, num_servers=50):
474         self.node = FakeClient(mode="good", num_servers=num_servers)
475         self.u = upload.Uploader()
476         self.u.running = True
477         self.u.parent = self.node
478
479     def get_data(self, size):
480         return DATA[:size]
481
482     def _check_large(self, newuri, size):
483         u = uri.from_string(newuri)
484         self.failUnless(isinstance(u, uri.CHKFileURI))
485         self.failUnless(isinstance(u.get_storage_index(), str))
486         self.failUnlessEqual(len(u.get_storage_index()), 16)
487         self.failUnless(isinstance(u.key, str))
488         self.failUnlessEqual(len(u.key), 16)
489         self.failUnlessEqual(u.size, size)
490
491     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
492         p = {"k": k,
493              "happy": happy,
494              "n": n,
495              "max_segment_size": max_segsize,
496              }
497         self.node.DEFAULT_ENCODING_PARAMETERS = p
498
499     def test_one_each(self):
500         # if we have 50 shares, and there are 50 servers, and they all accept
501         # a share, we should get exactly one share per server
502
503         self.make_client()
504         data = self.get_data(SIZE_LARGE)
505         self.set_encoding_parameters(25, 30, 50)
506         d = upload_data(self.u, data)
507         d.addCallback(extract_uri)
508         d.addCallback(self._check_large, SIZE_LARGE)
509         def _check(res):
510             for s in self.node.last_servers:
511                 allocated = s.allocated
512                 self.failUnlessEqual(len(allocated), 1)
513                 self.failUnlessEqual(s.queries, 1)
514         d.addCallback(_check)
515         return d
516
517     def test_two_each(self):
518         # if we have 100 shares, and there are 50 servers, and they all
519         # accept all shares, we should get exactly two shares per server
520
521         self.make_client()
522         data = self.get_data(SIZE_LARGE)
523         # if there are 50 servers, then happy needs to be <= 50
524         self.set_encoding_parameters(50, 50, 100)
525         d = upload_data(self.u, data)
526         d.addCallback(extract_uri)
527         d.addCallback(self._check_large, SIZE_LARGE)
528         def _check(res):
529             for s in self.node.last_servers:
530                 allocated = s.allocated
531                 self.failUnlessEqual(len(allocated), 2)
532                 self.failUnlessEqual(s.queries, 2)
533         d.addCallback(_check)
534         return d
535
536     def test_one_each_plus_one_extra(self):
537         # if we have 51 shares, and there are 50 servers, then one server
538         # gets two shares and the rest get just one
539
540         self.make_client()
541         data = self.get_data(SIZE_LARGE)
542         self.set_encoding_parameters(24, 41, 51)
543         d = upload_data(self.u, data)
544         d.addCallback(extract_uri)
545         d.addCallback(self._check_large, SIZE_LARGE)
546         def _check(res):
547             got_one = []
548             got_two = []
549             for s in self.node.last_servers:
550                 allocated = s.allocated
551                 self.failUnless(len(allocated) in (1,2), len(allocated))
552                 if len(allocated) == 1:
553                     self.failUnlessEqual(s.queries, 1)
554                     got_one.append(s)
555                 else:
556                     self.failUnlessEqual(s.queries, 2)
557                     got_two.append(s)
558             self.failUnlessEqual(len(got_one), 49)
559             self.failUnlessEqual(len(got_two), 1)
560         d.addCallback(_check)
561         return d
562
563     def test_four_each(self):
564         # if we have 200 shares, and there are 50 servers, then each server
565         # gets 4 shares. The design goal is to accomplish this with only two
566         # queries per server.
567
568         self.make_client()
569         data = self.get_data(SIZE_LARGE)
570         # if there are 50 servers, then happy should be no more than 50 if we
571         # want this to work.
572         self.set_encoding_parameters(100, 50, 200)
573         d = upload_data(self.u, data)
574         d.addCallback(extract_uri)
575         d.addCallback(self._check_large, SIZE_LARGE)
576         def _check(res):
577             for s in self.node.last_servers:
578                 allocated = s.allocated
579                 self.failUnlessEqual(len(allocated), 4)
580                 self.failUnlessEqual(s.queries, 2)
581         d.addCallback(_check)
582         return d
583
584     def test_three_of_ten(self):
585         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
586         # 4+4+2
587
588         self.make_client(3)
589         data = self.get_data(SIZE_LARGE)
590         self.set_encoding_parameters(3, 3, 10)
591         d = upload_data(self.u, data)
592         d.addCallback(extract_uri)
593         d.addCallback(self._check_large, SIZE_LARGE)
594         def _check(res):
595             counts = {}
596             for s in self.node.last_servers:
597                 allocated = s.allocated
598                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
599             histogram = [counts.get(i, 0) for i in range(5)]
600             self.failUnlessEqual(histogram, [0,0,0,2,1])
601         d.addCallback(_check)
602         return d
603
604     def test_some_big_some_small(self):
605         # 10 shares, 20 servers, but half the servers don't support a
606         # share-size large enough for our file
607         mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
608         self.node = FakeClient(mode, num_servers=20)
609         self.u = upload.Uploader()
610         self.u.running = True
611         self.u.parent = self.node
612
613         data = self.get_data(SIZE_LARGE)
614         self.set_encoding_parameters(3, 5, 10)
615         d = upload_data(self.u, data)
616         d.addCallback(extract_uri)
617         d.addCallback(self._check_large, SIZE_LARGE)
618         def _check(res):
619             # we should have put one share each on the big servers, and zero
620             # shares on the small servers
621             total_allocated = 0
622             for p in self.node.last_servers:
623                 if p.mode == "good":
624                     self.failUnlessEqual(len(p.allocated), 1)
625                 elif p.mode == "small":
626                     self.failUnlessEqual(len(p.allocated), 0)
627                 total_allocated += len(p.allocated)
628             self.failUnlessEqual(total_allocated, 10)
629         d.addCallback(_check)
630         return d
631
632
633 class StorageIndex(unittest.TestCase):
634     def test_params_must_matter(self):
635         DATA = "I am some data"
636         u = upload.Data(DATA, convergence="")
637         eu = upload.EncryptAnUploadable(u)
638         d1 = eu.get_storage_index()
639
640         # CHK means the same data should encrypt the same way
641         u = upload.Data(DATA, convergence="")
642         eu = upload.EncryptAnUploadable(u)
643         d1a = eu.get_storage_index()
644
645         # but if we use a different convergence string it should be different
646         u = upload.Data(DATA, convergence="wheee!")
647         eu = upload.EncryptAnUploadable(u)
648         d1salt1 = eu.get_storage_index()
649
650         # and if we add yet a different convergence it should be different again
651         u = upload.Data(DATA, convergence="NOT wheee!")
652         eu = upload.EncryptAnUploadable(u)
653         d1salt2 = eu.get_storage_index()
654
655         # and if we use the first string again it should be the same as last time
656         u = upload.Data(DATA, convergence="wheee!")
657         eu = upload.EncryptAnUploadable(u)
658         d1salt1a = eu.get_storage_index()
659
660         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
661         u = upload.Data(DATA, convergence="")
662         u.encoding_param_k = u.default_encoding_param_k + 1
663         eu = upload.EncryptAnUploadable(u)
664         d2 = eu.get_storage_index()
665
666         # and if we use a random key, it should be different than the CHK
667         u = upload.Data(DATA, convergence=None)
668         eu = upload.EncryptAnUploadable(u)
669         d3 = eu.get_storage_index()
670         # and different from another instance
671         u = upload.Data(DATA, convergence=None)
672         eu = upload.EncryptAnUploadable(u)
673         d4 = eu.get_storage_index()
674
675         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
676         def _done(res):
677             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
678             self.failUnlessEqual(si1, si1a)
679             self.failIfEqual(si1, si2)
680             self.failIfEqual(si1, si3)
681             self.failIfEqual(si1, si4)
682             self.failIfEqual(si3, si4)
683             self.failIfEqual(si1salt1, si1)
684             self.failIfEqual(si1salt1, si1salt2)
685             self.failIfEqual(si1salt2, si1)
686             self.failUnlessEqual(si1salt1, si1salt1a)
687         d.addCallback(_done)
688         return d
689
690 # copied from python docs because itertools.combinations was added in
691 # python 2.6 and we support >= 2.4.
692 def combinations(iterable, r):
693     # combinations('ABCD', 2) --> AB AC AD BC BD CD
694     # combinations(range(4), 3) --> 012 013 023 123
695     pool = tuple(iterable)
696     n = len(pool)
697     if r > n:
698         return
699     indices = range(r)
700     yield tuple(pool[i] for i in indices)
701     while True:
702         for i in reversed(range(r)):
703             if indices[i] != i + n - r:
704                 break
705         else:
706             return
707         indices[i] += 1
708         for j in range(i+1, r):
709             indices[j] = indices[j-1] + 1
710         yield tuple(pool[i] for i in indices)
711
712 def is_happy_enough(servertoshnums, h, k):
713     """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """
714     if len(servertoshnums) < h:
715         return False
716     # print "servertoshnums: ", servertoshnums, h, k
717     for happysetcombo in combinations(servertoshnums.iterkeys(), h):
718         # print "happysetcombo: ", happysetcombo
719         for subsetcombo in combinations(happysetcombo, k):
720             shnums = reduce(set.union, [ servertoshnums[s] for s in subsetcombo ])
721             # print "subsetcombo: ", subsetcombo, ", shnums: ", shnums
722             if len(shnums) < k:
723                 # print "NOT HAAPP{Y", shnums, k
724                 return False
725     # print "HAAPP{Y"
726     return True
727
728 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
729     ShouldFailMixin):
730     def find_all_shares(self, unused=None):
731         """Locate shares on disk. Returns a dict that maps
732         server to set of sharenums.
733         """
734         assert self.g, "I tried to find a grid at self.g, but failed"
735         servertoshnums = {} # k: server, v: set(shnum)
736
737         for i, c in self.g.servers_by_number.iteritems():
738             for (dirp, dirns, fns) in os.walk(c.sharedir):
739                 for fn in fns:
740                     try:
741                         sharenum = int(fn)
742                     except TypeError:
743                         # Whoops, I guess that's not a share file then.
744                         pass
745                     else:
746                         servertoshnums.setdefault(i, set()).add(sharenum)
747
748         return servertoshnums
749
750     def _do_upload_with_broken_servers(self, servers_to_break):
751         """
752         I act like a normal upload, but before I send the results of
753         Tahoe2ServerSelector to the Encoder, I break the first
754         servers_to_break ServerTrackers in the upload_servers part of the
755         return result.
756         """
757         assert self.g, "I tried to find a grid at self.g, but failed"
758         broker = self.g.clients[0].storage_broker
759         sh     = self.g.clients[0]._secret_holder
760         data = upload.Data("data" * 10000, convergence="")
761         data.encoding_param_k = 3
762         data.encoding_param_happy = 4
763         data.encoding_param_n = 10
764         uploadable = upload.EncryptAnUploadable(data)
765         encoder = encode.Encoder()
766         encoder.set_encrypted_uploadable(uploadable)
767         status = upload.UploadStatus()
768         selector = upload.Tahoe2ServerSelector("dglev", "test", status)
769         storage_index = encoder.get_param("storage_index")
770         share_size = encoder.get_param("share_size")
771         block_size = encoder.get_param("block_size")
772         num_segments = encoder.get_param("num_segments")
773         d = selector.get_shareholders(broker, sh, storage_index,
774                                       share_size, block_size, num_segments,
775                                       10, 3, 4)
776         def _have_shareholders((upload_servers, already_servers)):
777             assert servers_to_break <= len(upload_servers)
778             for index in xrange(servers_to_break):
779                 server = list(upload_servers)[index]
780                 for share in server.buckets.keys():
781                     server.buckets[share].abort()
782             buckets = {}
783             servermap = already_servers.copy()
784             for server in upload_servers:
785                 buckets.update(server.buckets)
786                 for bucket in server.buckets:
787                     servermap.setdefault(bucket, set()).add(server.serverid)
788             encoder.set_shareholders(buckets, servermap)
789             d = encoder.start()
790             return d
791         d.addCallback(_have_shareholders)
792         return d
793
794     def _has_happy_share_distribution(self):
795         servertoshnums = self.find_all_shares()
796         k = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['k']
797         h = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['happy']
798         return is_happy_enough(servertoshnums, h, k)
799
800     def _add_server(self, server_number, readonly=False):
801         assert self.g, "I tried to find a grid at self.g, but failed"
802         ss = self.g.make_server(server_number, readonly)
803         log.msg("just created a server, number: %s => %s" % (server_number, ss,))
804         self.g.add_server(server_number, ss)
805
806     def _add_server_with_share(self, server_number, share_number=None,
807                                readonly=False):
808         self._add_server(server_number, readonly)
809         if share_number is not None:
810             self._copy_share_to_server(share_number, server_number)
811
812
813     def _copy_share_to_server(self, share_number, server_number):
814         ss = self.g.servers_by_number[server_number]
815         # Copy share i from the directory associated with the first
816         # storage server to the directory associated with this one.
817         assert self.g, "I tried to find a grid at self.g, but failed"
818         assert self.shares, "I tried to find shares at self.shares, but failed"
819         old_share_location = self.shares[share_number][2]
820         new_share_location = os.path.join(ss.storedir, "shares")
821         si = uri.from_string(self.uri).get_storage_index()
822         new_share_location = os.path.join(new_share_location,
823                                           storage_index_to_dir(si))
824         if not os.path.exists(new_share_location):
825             os.makedirs(new_share_location)
826         new_share_location = os.path.join(new_share_location,
827                                           str(share_number))
828         if old_share_location != new_share_location:
829             shutil.copy(old_share_location, new_share_location)
830         shares = self.find_uri_shares(self.uri)
831         # Make sure that the storage server has the share.
832         self.failUnless((share_number, ss.my_nodeid, new_share_location)
833                         in shares)
834
835     def _setup_grid(self):
836         """
837         I set up a NoNetworkGrid with a single server and client.
838         """
839         self.set_up_grid(num_clients=1, num_servers=1)
840
841     def _setup_and_upload(self, **kwargs):
842         """
843         I set up a NoNetworkGrid with a single server and client,
844         upload a file to it, store its uri in self.uri, and store its
845         sharedata in self.shares.
846         """
847         self._setup_grid()
848         client = self.g.clients[0]
849         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
850         if "n" in kwargs and "k" in kwargs:
851             client.DEFAULT_ENCODING_PARAMETERS['k'] = kwargs['k']
852             client.DEFAULT_ENCODING_PARAMETERS['n'] = kwargs['n']
853         data = upload.Data("data" * 10000, convergence="")
854         self.data = data
855         d = client.upload(data)
856         def _store_uri(ur):
857             self.uri = ur.uri
858         d.addCallback(_store_uri)
859         d.addCallback(lambda ign:
860             self.find_uri_shares(self.uri))
861         def _store_shares(shares):
862             self.shares = shares
863         d.addCallback(_store_shares)
864         return d
865
866     def test_configure_parameters(self):
867         self.basedir = self.mktemp()
868         hooks = {0: self._set_up_nodes_extra_config}
869         self.set_up_grid(client_config_hooks=hooks)
870         c0 = self.g.clients[0]
871
872         DATA = "data" * 100
873         u = upload.Data(DATA, convergence="")
874         d = c0.upload(u)
875         d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
876         m = monitor.Monitor()
877         d.addCallback(lambda fn: fn.check(m))
878         def _check(cr):
879             data = cr.get_data()
880             self.failUnlessEqual(data["count-shares-needed"], 7)
881             self.failUnlessEqual(data["count-shares-expected"], 12)
882         d.addCallback(_check)
883         return d
884
885
886     def _setUp(self, ns):
887         # Used by test_happy_semantics and test_preexisting_share_behavior
888         # to set up the grid.
889         self.node = FakeClient(mode="good", num_servers=ns)
890         self.u = upload.Uploader()
891         self.u.running = True
892         self.u.parent = self.node
893
894
895     def test_happy_semantics(self):
896         self._setUp(2)
897         DATA = upload.Data("kittens" * 10000, convergence="")
898         # These parameters are unsatisfiable with only 2 servers.
899         self.set_encoding_parameters(k=3, happy=5, n=10)
900         d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
901                             "shares could be placed or found on only 2 "
902                             "server(s). We were asked to place shares on "
903                             "at least 5 server(s) such that any 3 of them "
904                             "have enough shares to recover the file",
905                             self.u.upload, DATA)
906         # Let's reset the client to have 10 servers
907         d.addCallback(lambda ign:
908             self._setUp(10))
909         # These parameters are satisfiable with 10 servers.
910         d.addCallback(lambda ign:
911             self.set_encoding_parameters(k=3, happy=5, n=10))
912         d.addCallback(lambda ign:
913             self.u.upload(DATA))
914         # Let's reset the client to have 7 servers
915         # (this is less than n, but more than h)
916         d.addCallback(lambda ign:
917             self._setUp(7))
918         # These parameters are satisfiable with 7 servers.
919         d.addCallback(lambda ign:
920             self.set_encoding_parameters(k=3, happy=5, n=10))
921         d.addCallback(lambda ign:
922             self.u.upload(DATA))
923         return d
924
925     def test_aborted_shares(self):
926         self.basedir = "upload/EncodingParameters/aborted_shares"
927         self.set_up_grid(num_servers=4)
928         c = self.g.clients[0]
929         DATA = upload.Data(100* "kittens", convergence="")
930         # These parameters are unsatisfiable with only 4 servers, but should
931         # work with 5, as long as the original 4 are not stuck in the open
932         # BucketWriter state (open() but not
933         parms = {"k":2, "happy":5, "n":5, "max_segment_size": 1*MiB}
934         c.DEFAULT_ENCODING_PARAMETERS = parms
935         d = self.shouldFail(UploadUnhappinessError, "test_aborted_shares",
936                             "shares could be placed on only 4 "
937                             "server(s) such that any 2 of them have enough "
938                             "shares to recover the file, but we were asked "
939                             "to place shares on at least 5 such servers",
940                             c.upload, DATA)
941         # now add the 5th server
942         d.addCallback(lambda ign: self._add_server(4, False))
943         # and this time the upload ought to succeed
944         d.addCallback(lambda ign: c.upload(DATA))
945         d.addCallback(lambda ign:
946             self.failUnless(self._has_happy_share_distribution()))
947         return d
948
949
950     def test_problem_layout_comment_52(self):
951         def _basedir():
952             self.basedir = self.mktemp()
953         _basedir()
954         # This scenario is at
955         # http://allmydata.org/trac/tahoe/ticket/778#comment:52
956         #
957         # The scenario in comment:52 proposes that we have a layout
958         # like:
959         # server 0: shares 1 - 9
960         # server 1: share 0, read-only
961         # server 2: share 0, read-only
962         # server 3: share 0, read-only
963         # To get access to the shares, we will first upload to one
964         # server, which will then have shares 0 - 9. We'll then
965         # add three new servers, configure them to not accept any new
966         # shares, then write share 0 directly into the serverdir of each,
967         # and then remove share 0 from server 0 in the same way.
968         # Then each of servers 1 - 3 will report that they have share 0,
969         # and will not accept any new share, while server 0 will report that
970         # it has shares 1 - 9 and will accept new shares.
971         # We'll then set 'happy' = 4, and see that an upload fails
972         # (as it should)
973         d = self._setup_and_upload()
974         d.addCallback(lambda ign:
975             self._add_server_with_share(server_number=1, share_number=0,
976                                         readonly=True))
977         d.addCallback(lambda ign:
978             self._add_server_with_share(server_number=2, share_number=0,
979                                         readonly=True))
980         d.addCallback(lambda ign:
981             self._add_server_with_share(server_number=3, share_number=0,
982                                         readonly=True))
983         # Remove the first share from server 0.
984         def _remove_share_0_from_server_0():
985             share_location = self.shares[0][2]
986             os.remove(share_location)
987         d.addCallback(lambda ign:
988             _remove_share_0_from_server_0())
989         # Set happy = 4 in the client.
990         def _prepare():
991             client = self.g.clients[0]
992             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
993             return client
994         d.addCallback(lambda ign:
995             _prepare())
996         # Uploading data should fail
997         d.addCallback(lambda client:
998             self.shouldFail(UploadUnhappinessError,
999                             "test_problem_layout_comment_52_test_1",
1000                             "shares could be placed or found on 4 server(s), "
1001                             "but they are not spread out evenly enough to "
1002                             "ensure that any 3 of these servers would have "
1003                             "enough shares to recover the file. "
1004                             "We were asked to place shares on at "
1005                             "least 4 servers such that any 3 of them have "
1006                             "enough shares to recover the file",
1007                             client.upload, upload.Data("data" * 10000,
1008                                                        convergence="")))
1009
1010         # Do comment:52, but like this:
1011         # server 2: empty
1012         # server 3: share 0, read-only
1013         # server 1: share 0, read-only
1014         # server 0: shares 0-9
1015         d.addCallback(lambda ign:
1016             _basedir())
1017         d.addCallback(lambda ign:
1018             self._setup_and_upload())
1019         d.addCallback(lambda ign:
1020             self._add_server(server_number=2))
1021         d.addCallback(lambda ign:
1022             self._add_server_with_share(server_number=3, share_number=0,
1023                                         readonly=True))
1024         d.addCallback(lambda ign:
1025             self._add_server_with_share(server_number=1, share_number=0,
1026                                         readonly=True))
1027         def _prepare2():
1028             client = self.g.clients[0]
1029             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1030             return client
1031         d.addCallback(lambda ign:
1032             _prepare2())
1033         d.addCallback(lambda client:
1034             self.shouldFail(UploadUnhappinessError,
1035                             "test_problem_layout_comment_52_test_2",
1036                             "shares could be placed on only 3 server(s) such "
1037                             "that any 3 of them have enough shares to recover "
1038                             "the file, but we were asked to place shares on "
1039                             "at least 4 such servers.",
1040                             client.upload, upload.Data("data" * 10000,
1041                                                        convergence="")))
1042         return d
1043
1044
1045     def test_problem_layout_comment_53(self):
1046         # This scenario is at
1047         # http://allmydata.org/trac/tahoe/ticket/778#comment:53
1048         #
1049         # Set up the grid to have one server
1050         def _change_basedir(ign):
1051             self.basedir = self.mktemp()
1052         _change_basedir(None)
1053         # We start by uploading all of the shares to one server.
1054         # Next, we'll add three new servers to our NoNetworkGrid. We'll add
1055         # one share from our initial upload to each of these.
1056         # The counterintuitive ordering of the share numbers is to deal with
1057         # the permuting of these servers -- distributing the shares this
1058         # way ensures that the Tahoe2ServerSelector sees them in the order
1059         # described below.
1060         d = self._setup_and_upload()
1061         d.addCallback(lambda ign:
1062             self._add_server_with_share(server_number=1, share_number=2))
1063         d.addCallback(lambda ign:
1064             self._add_server_with_share(server_number=2, share_number=0))
1065         d.addCallback(lambda ign:
1066             self._add_server_with_share(server_number=3, share_number=1))
1067         # So, we now have the following layout:
1068         # server 0: shares 0 - 9
1069         # server 1: share 2
1070         # server 2: share 0
1071         # server 3: share 1
1072         # We change the 'happy' parameter in the client to 4.
1073         # The Tahoe2ServerSelector will see the servers permuted as:
1074         # 2, 3, 1, 0
1075         # Ideally, a reupload of our original data should work.
1076         def _reset_encoding_parameters(ign, happy=4):
1077             client = self.g.clients[0]
1078             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1079             return client
1080         d.addCallback(_reset_encoding_parameters)
1081         d.addCallback(lambda client:
1082             client.upload(upload.Data("data" * 10000, convergence="")))
1083         d.addCallback(lambda ign:
1084             self.failUnless(self._has_happy_share_distribution()))
1085
1086
1087         # This scenario is basically comment:53, but changed so that the
1088         # Tahoe2ServerSelector sees the server with all of the shares before
1089         # any of the other servers.
1090         # The layout is:
1091         # server 2: shares 0 - 9
1092         # server 3: share 0
1093         # server 1: share 1
1094         # server 4: share 2
1095         # The Tahoe2ServerSelector sees the servers permuted as:
1096         # 2, 3, 1, 4
1097         # Note that server 0 has been replaced by server 4; this makes it
1098         # easier to ensure that the last server seen by Tahoe2ServerSelector
1099         # has only one share.
1100         d.addCallback(_change_basedir)
1101         d.addCallback(lambda ign:
1102             self._setup_and_upload())
1103         d.addCallback(lambda ign:
1104             self._add_server_with_share(server_number=2, share_number=0))
1105         d.addCallback(lambda ign:
1106             self._add_server_with_share(server_number=3, share_number=1))
1107         d.addCallback(lambda ign:
1108             self._add_server_with_share(server_number=1, share_number=2))
1109         # Copy all of the other shares to server number 2
1110         def _copy_shares(ign):
1111             for i in xrange(0, 10):
1112                 self._copy_share_to_server(i, 2)
1113         d.addCallback(_copy_shares)
1114         # Remove the first server, and add a placeholder with share 0
1115         d.addCallback(lambda ign:
1116             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1117         d.addCallback(lambda ign:
1118             self._add_server_with_share(server_number=4, share_number=0))
1119         # Now try uploading.
1120         d.addCallback(_reset_encoding_parameters)
1121         d.addCallback(lambda client:
1122             client.upload(upload.Data("data" * 10000, convergence="")))
1123         d.addCallback(lambda ign:
1124             self.failUnless(self._has_happy_share_distribution()))
1125
1126
1127         # Try the same thing, but with empty servers after the first one
1128         # We want to make sure that Tahoe2ServerSelector will redistribute
1129         # shares as necessary, not simply discover an existing layout.
1130         # The layout is:
1131         # server 2: shares 0 - 9
1132         # server 3: empty
1133         # server 1: empty
1134         # server 4: empty
1135         d.addCallback(_change_basedir)
1136         d.addCallback(lambda ign:
1137             self._setup_and_upload())
1138         d.addCallback(lambda ign:
1139             self._add_server(server_number=2))
1140         d.addCallback(lambda ign:
1141             self._add_server(server_number=3))
1142         d.addCallback(lambda ign:
1143             self._add_server(server_number=1))
1144         d.addCallback(lambda ign:
1145             self._add_server(server_number=4))
1146         d.addCallback(_copy_shares)
1147         d.addCallback(lambda ign:
1148             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1149         d.addCallback(_reset_encoding_parameters)
1150         d.addCallback(lambda client:
1151             client.upload(upload.Data("data" * 10000, convergence="")))
1152         # Make sure that only as many shares as necessary to satisfy
1153         # servers of happiness were pushed.
1154         d.addCallback(lambda results:
1155             self.failUnlessEqual(results.pushed_shares, 3))
1156         d.addCallback(lambda ign:
1157             self.failUnless(self._has_happy_share_distribution()))
1158         return d
1159
1160     def test_problem_layout_ticket_1124(self):
1161         self.basedir = self.mktemp()
1162         d = self._setup_and_upload(k=2, n=4)
1163
1164         # server 0: shares 0, 1, 2, 3
1165         # server 1: shares 0, 3
1166         # server 2: share 1
1167         # server 3: share 2
1168         # With this layout, an upload should just be satisfied that the current distribution is good enough, right?
1169         def _setup(ign):
1170             self._add_server_with_share(server_number=0, share_number=None)
1171             self._add_server_with_share(server_number=1, share_number=0)
1172             self._add_server_with_share(server_number=2, share_number=1)
1173             self._add_server_with_share(server_number=3, share_number=2)
1174             # Copy shares
1175             self._copy_share_to_server(3, 1)
1176             client = self.g.clients[0]
1177             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1178             return client
1179
1180         d.addCallback(_setup)
1181         d.addCallback(lambda client:
1182             client.upload(upload.Data("data" * 10000, convergence="")))
1183         d.addCallback(lambda ign:
1184             self.failUnless(self._has_happy_share_distribution()))
1185         return d
1186     test_problem_layout_ticket_1124.todo = "Fix this after 1.7.1 release."
1187
1188     def test_happiness_with_some_readonly_servers(self):
1189         # Try the following layout
1190         # server 2: shares 0-9
1191         # server 4: share 0, read-only
1192         # server 3: share 1, read-only
1193         # server 1: share 2, read-only
1194         self.basedir = self.mktemp()
1195         d = self._setup_and_upload()
1196         d.addCallback(lambda ign:
1197             self._add_server_with_share(server_number=2, share_number=0))
1198         d.addCallback(lambda ign:
1199             self._add_server_with_share(server_number=3, share_number=1,
1200                                         readonly=True))
1201         d.addCallback(lambda ign:
1202             self._add_server_with_share(server_number=1, share_number=2,
1203                                         readonly=True))
1204         # Copy all of the other shares to server number 2
1205         def _copy_shares(ign):
1206             for i in xrange(1, 10):
1207                 self._copy_share_to_server(i, 2)
1208         d.addCallback(_copy_shares)
1209         # Remove server 0, and add another in its place
1210         d.addCallback(lambda ign:
1211             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1212         d.addCallback(lambda ign:
1213             self._add_server_with_share(server_number=4, share_number=0,
1214                                         readonly=True))
1215         def _reset_encoding_parameters(ign, happy=4):
1216             client = self.g.clients[0]
1217             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1218             return client
1219         d.addCallback(_reset_encoding_parameters)
1220         d.addCallback(lambda client:
1221             client.upload(upload.Data("data" * 10000, convergence="")))
1222         d.addCallback(lambda ign:
1223             self.failUnless(self._has_happy_share_distribution()))
1224         return d
1225
1226
1227     def test_happiness_with_all_readonly_servers(self):
1228         # server 3: share 1, read-only
1229         # server 1: share 2, read-only
1230         # server 2: shares 0-9, read-only
1231         # server 4: share 0, read-only
1232         # The idea with this test is to make sure that the survey of
1233         # read-only servers doesn't undercount servers of happiness
1234         self.basedir = self.mktemp()
1235         d = self._setup_and_upload()
1236         d.addCallback(lambda ign:
1237             self._add_server_with_share(server_number=4, share_number=0,
1238                                         readonly=True))
1239         d.addCallback(lambda ign:
1240             self._add_server_with_share(server_number=3, share_number=1,
1241                                         readonly=True))
1242         d.addCallback(lambda ign:
1243             self._add_server_with_share(server_number=1, share_number=2,
1244                                         readonly=True))
1245         d.addCallback(lambda ign:
1246             self._add_server_with_share(server_number=2, share_number=0,
1247                                         readonly=True))
1248         def _copy_shares(ign):
1249             for i in xrange(1, 10):
1250                 self._copy_share_to_server(i, 2)
1251         d.addCallback(_copy_shares)
1252         d.addCallback(lambda ign:
1253             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1254         def _reset_encoding_parameters(ign, happy=4):
1255             client = self.g.clients[0]
1256             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1257             return client
1258         d.addCallback(_reset_encoding_parameters)
1259         d.addCallback(lambda client:
1260             client.upload(upload.Data("data" * 10000, convergence="")))
1261         d.addCallback(lambda ign:
1262             self.failUnless(self._has_happy_share_distribution()))
1263         return d
1264
1265
1266     def test_dropped_servers_in_encoder(self):
1267         # The Encoder does its own "servers_of_happiness" check if it
1268         # happens to lose a bucket during an upload (it assumes that
1269         # the layout presented to it satisfies "servers_of_happiness"
1270         # until a failure occurs)
1271         #
1272         # This test simulates an upload where servers break after server
1273         # selection, but before they are written to.
1274         def _set_basedir(ign=None):
1275             self.basedir = self.mktemp()
1276         _set_basedir()
1277         d = self._setup_and_upload();
1278         # Add 5 servers
1279         def _do_server_setup(ign):
1280             self._add_server(server_number=1)
1281             self._add_server(server_number=2)
1282             self._add_server(server_number=3)
1283             self._add_server(server_number=4)
1284             self._add_server(server_number=5)
1285         d.addCallback(_do_server_setup)
1286         # remove the original server
1287         # (necessary to ensure that the Tahoe2ServerSelector will distribute
1288         #  all the shares)
1289         def _remove_server(ign):
1290             server = self.g.servers_by_number[0]
1291             self.g.remove_server(server.my_nodeid)
1292         d.addCallback(_remove_server)
1293         # This should succeed; we still have 4 servers, and the
1294         # happiness of the upload is 4.
1295         d.addCallback(lambda ign:
1296             self._do_upload_with_broken_servers(1))
1297         # Now, do the same thing over again, but drop 2 servers instead
1298         # of 1. This should fail, because servers_of_happiness is 4 and
1299         # we can't satisfy that.
1300         d.addCallback(_set_basedir)
1301         d.addCallback(lambda ign:
1302             self._setup_and_upload())
1303         d.addCallback(_do_server_setup)
1304         d.addCallback(_remove_server)
1305         d.addCallback(lambda ign:
1306             self.shouldFail(UploadUnhappinessError,
1307                             "test_dropped_servers_in_encoder",
1308                             "shares could be placed on only 3 server(s) "
1309                             "such that any 3 of them have enough shares to "
1310                             "recover the file, but we were asked to place "
1311                             "shares on at least 4",
1312                             self._do_upload_with_broken_servers, 2))
1313         # Now do the same thing over again, but make some of the servers
1314         # readonly, break some of the ones that aren't, and make sure that
1315         # happiness accounting is preserved.
1316         d.addCallback(_set_basedir)
1317         d.addCallback(lambda ign:
1318             self._setup_and_upload())
1319         def _do_server_setup_2(ign):
1320             self._add_server(1)
1321             self._add_server(2)
1322             self._add_server(3)
1323             self._add_server_with_share(4, 7, readonly=True)
1324             self._add_server_with_share(5, 8, readonly=True)
1325         d.addCallback(_do_server_setup_2)
1326         d.addCallback(_remove_server)
1327         d.addCallback(lambda ign:
1328             self._do_upload_with_broken_servers(1))
1329         d.addCallback(_set_basedir)
1330         d.addCallback(lambda ign:
1331             self._setup_and_upload())
1332         d.addCallback(_do_server_setup_2)
1333         d.addCallback(_remove_server)
1334         d.addCallback(lambda ign:
1335             self.shouldFail(UploadUnhappinessError,
1336                             "test_dropped_servers_in_encoder",
1337                             "shares could be placed on only 3 server(s) "
1338                             "such that any 3 of them have enough shares to "
1339                             "recover the file, but we were asked to place "
1340                             "shares on at least 4",
1341                             self._do_upload_with_broken_servers, 2))
1342         return d
1343
1344
1345     def test_merge_peers(self):
1346         # merge_peers merges a list of upload_servers and a dict of
1347         # shareid -> serverid mappings.
1348         shares = {
1349                     1 : set(["server1"]),
1350                     2 : set(["server2"]),
1351                     3 : set(["server3"]),
1352                     4 : set(["server4", "server5"]),
1353                     5 : set(["server1", "server2"]),
1354                  }
1355         # if not provided with a upload_servers argument, it should just
1356         # return the first argument unchanged.
1357         self.failUnlessEqual(shares, merge_peers(shares, set([])))
1358         class FakeServerTracker:
1359             pass
1360         trackers = []
1361         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1362             t = FakeServerTracker()
1363             t.serverid = server
1364             t.buckets = [i]
1365             trackers.append(t)
1366         expected = {
1367                     1 : set(["server1"]),
1368                     2 : set(["server2"]),
1369                     3 : set(["server3"]),
1370                     4 : set(["server4", "server5"]),
1371                     5 : set(["server1", "server2", "server5"]),
1372                     6 : set(["server6"]),
1373                     7 : set(["server7"]),
1374                     8 : set(["server8"]),
1375                    }
1376         self.failUnlessEqual(expected, merge_peers(shares, set(trackers)))
1377         shares2 = {}
1378         expected = {
1379                     5 : set(["server5"]),
1380                     6 : set(["server6"]),
1381                     7 : set(["server7"]),
1382                     8 : set(["server8"]),
1383                    }
1384         self.failUnlessEqual(expected, merge_peers(shares2, set(trackers)))
1385         shares3 = {}
1386         trackers = []
1387         expected = {}
1388         for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1389             shares3[i] = set([server])
1390             t = FakeServerTracker()
1391             t.serverid = server
1392             t.buckets = [i]
1393             trackers.append(t)
1394             expected[i] = set([server])
1395         self.failUnlessEqual(expected, merge_peers(shares3, set(trackers)))
1396
1397
1398     def test_servers_of_happiness_utility_function(self):
1399         # These tests are concerned with the servers_of_happiness()
1400         # utility function, and its underlying matching algorithm. Other
1401         # aspects of the servers_of_happiness behavior are tested
1402         # elsehwere These tests exist to ensure that
1403         # servers_of_happiness doesn't under or overcount the happiness
1404         # value for given inputs.
1405
1406         # servers_of_happiness expects a dict of
1407         # shnum => set(serverids) as a preexisting shares argument.
1408         test1 = {
1409                  1 : set(["server1"]),
1410                  2 : set(["server2"]),
1411                  3 : set(["server3"]),
1412                  4 : set(["server4"])
1413                 }
1414         happy = servers_of_happiness(test1)
1415         self.failUnlessEqual(4, happy)
1416         test1[4] = set(["server1"])
1417         # We've added a duplicate server, so now servers_of_happiness
1418         # should be 3 instead of 4.
1419         happy = servers_of_happiness(test1)
1420         self.failUnlessEqual(3, happy)
1421         # The second argument of merge_peers should be a set of objects with
1422         # serverid and buckets as attributes. In actual use, these will be
1423         # ServerTracker instances, but for testing it is fine to make a
1424         # FakeServerTracker whose job is to hold those instance variables to
1425         # test that part.
1426         class FakeServerTracker:
1427             pass
1428         trackers = []
1429         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1430             t = FakeServerTracker()
1431             t.serverid = server
1432             t.buckets = [i]
1433             trackers.append(t)
1434         # Recall that test1 is a server layout with servers_of_happiness
1435         # = 3.  Since there isn't any overlap between the shnum ->
1436         # set([serverid]) correspondences in test1 and those in trackers,
1437         # the result here should be 7.
1438         test2 = merge_peers(test1, set(trackers))
1439         happy = servers_of_happiness(test2)
1440         self.failUnlessEqual(7, happy)
1441         # Now add an overlapping server to trackers. This is redundant,
1442         # so it should not cause the previously reported happiness value
1443         # to change.
1444         t = FakeServerTracker()
1445         t.serverid = "server1"
1446         t.buckets = [1]
1447         trackers.append(t)
1448         test2 = merge_peers(test1, set(trackers))
1449         happy = servers_of_happiness(test2)
1450         self.failUnlessEqual(7, happy)
1451         test = {}
1452         happy = servers_of_happiness(test)
1453         self.failUnlessEqual(0, happy)
1454         # Test a more substantial overlap between the trackers and the
1455         # existing assignments.
1456         test = {
1457             1 : set(['server1']),
1458             2 : set(['server2']),
1459             3 : set(['server3']),
1460             4 : set(['server4']),
1461         }
1462         trackers = []
1463         t = FakeServerTracker()
1464         t.serverid = 'server5'
1465         t.buckets = [4]
1466         trackers.append(t)
1467         t = FakeServerTracker()
1468         t.serverid = 'server6'
1469         t.buckets = [3, 5]
1470         trackers.append(t)
1471         # The value returned by servers_of_happiness is the size
1472         # of a maximum matching in the bipartite graph that
1473         # servers_of_happiness() makes between serverids and share
1474         # numbers. It should find something like this:
1475         # (server 1, share 1)
1476         # (server 2, share 2)
1477         # (server 3, share 3)
1478         # (server 5, share 4)
1479         # (server 6, share 5)
1480         #
1481         # and, since there are 5 edges in this matching, it should
1482         # return 5.
1483         test2 = merge_peers(test, set(trackers))
1484         happy = servers_of_happiness(test2)
1485         self.failUnlessEqual(5, happy)
1486         # Zooko's first puzzle:
1487         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1488         #
1489         # server 1: shares 0, 1
1490         # server 2: shares 1, 2
1491         # server 3: share 2
1492         #
1493         # This should yield happiness of 3.
1494         test = {
1495             0 : set(['server1']),
1496             1 : set(['server1', 'server2']),
1497             2 : set(['server2', 'server3']),
1498         }
1499         self.failUnlessEqual(3, servers_of_happiness(test))
1500         # Zooko's second puzzle:
1501         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1502         #
1503         # server 1: shares 0, 1
1504         # server 2: share 1
1505         #
1506         # This should yield happiness of 2.
1507         test = {
1508             0 : set(['server1']),
1509             1 : set(['server1', 'server2']),
1510         }
1511         self.failUnlessEqual(2, servers_of_happiness(test))
1512
1513
1514     def test_shares_by_server(self):
1515         test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1516         sbs = shares_by_server(test)
1517         self.failUnlessEqual(set([1]), sbs["server1"])
1518         self.failUnlessEqual(set([2]), sbs["server2"])
1519         self.failUnlessEqual(set([3]), sbs["server3"])
1520         self.failUnlessEqual(set([4]), sbs["server4"])
1521         test1 = {
1522                     1 : set(["server1"]),
1523                     2 : set(["server1"]),
1524                     3 : set(["server1"]),
1525                     4 : set(["server2"]),
1526                     5 : set(["server2"])
1527                 }
1528         sbs = shares_by_server(test1)
1529         self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1530         self.failUnlessEqual(set([4, 5]), sbs["server2"])
1531         # This should fail unless the serverid part of the mapping is a set
1532         test2 = {1: "server1"}
1533         self.shouldFail(AssertionError,
1534                        "test_shares_by_server",
1535                        "",
1536                        shares_by_server, test2)
1537
1538
1539     def test_existing_share_detection(self):
1540         self.basedir = self.mktemp()
1541         d = self._setup_and_upload()
1542         # Our final setup should look like this:
1543         # server 1: shares 0 - 9, read-only
1544         # server 2: empty
1545         # server 3: empty
1546         # server 4: empty
1547         # The purpose of this test is to make sure that the server selector
1548         # knows about the shares on server 1, even though it is read-only.
1549         # It used to simply filter these out, which would cause the test
1550         # to fail when servers_of_happiness = 4.
1551         d.addCallback(lambda ign:
1552             self._add_server_with_share(1, 0, True))
1553         d.addCallback(lambda ign:
1554             self._add_server(2))
1555         d.addCallback(lambda ign:
1556             self._add_server(3))
1557         d.addCallback(lambda ign:
1558             self._add_server(4))
1559         def _copy_shares(ign):
1560             for i in xrange(1, 10):
1561                 self._copy_share_to_server(i, 1)
1562         d.addCallback(_copy_shares)
1563         d.addCallback(lambda ign:
1564             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1565         def _prepare_client(ign):
1566             client = self.g.clients[0]
1567             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1568             return client
1569         d.addCallback(_prepare_client)
1570         d.addCallback(lambda client:
1571             client.upload(upload.Data("data" * 10000, convergence="")))
1572         d.addCallback(lambda ign:
1573             self.failUnless(self._has_happy_share_distribution()))
1574         return d
1575
1576
1577     def test_query_counting(self):
1578         # If server selection fails, Tahoe2ServerSelector prints out a lot
1579         # of helpful diagnostic information, including query stats.
1580         # This test helps make sure that that information is accurate.
1581         self.basedir = self.mktemp()
1582         d = self._setup_and_upload()
1583         def _setup(ign):
1584             for i in xrange(1, 11):
1585                 self._add_server(server_number=i)
1586             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1587             c = self.g.clients[0]
1588             # We set happy to an unsatisfiable value so that we can check the
1589             # counting in the exception message. The same progress message
1590             # is also used when the upload is successful, but in that case it
1591             # only gets written to a log, so we can't see what it says.
1592             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1593             return c
1594         d.addCallback(_setup)
1595         d.addCallback(lambda c:
1596             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1597                             "10 queries placed some shares",
1598                             c.upload, upload.Data("data" * 10000,
1599                                                   convergence="")))
1600         # Now try with some readonly servers. We want to make sure that
1601         # the readonly server share discovery phase is counted correctly.
1602         def _reset(ign):
1603             self.basedir = self.mktemp()
1604             self.g = None
1605         d.addCallback(_reset)
1606         d.addCallback(lambda ign:
1607             self._setup_and_upload())
1608         def _then(ign):
1609             for i in xrange(1, 11):
1610                 self._add_server(server_number=i)
1611             self._add_server(server_number=11, readonly=True)
1612             self._add_server(server_number=12, readonly=True)
1613             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1614             c = self.g.clients[0]
1615             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1616             return c
1617         d.addCallback(_then)
1618         d.addCallback(lambda c:
1619             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1620                             "2 placed none (of which 2 placed none due to "
1621                             "the server being full",
1622                             c.upload, upload.Data("data" * 10000,
1623                                                   convergence="")))
1624         # Now try the case where the upload process finds a bunch of the
1625         # shares that it wants to place on the first server, including
1626         # the one that it wanted to allocate there. Though no shares will
1627         # be allocated in this request, it should still be called
1628         # productive, since it caused some homeless shares to be
1629         # removed.
1630         d.addCallback(_reset)
1631         d.addCallback(lambda ign:
1632             self._setup_and_upload())
1633
1634         def _next(ign):
1635             for i in xrange(1, 11):
1636                 self._add_server(server_number=i)
1637             # Copy all of the shares to server 9, since that will be
1638             # the first one that the selector sees.
1639             for i in xrange(10):
1640                 self._copy_share_to_server(i, 9)
1641             # Remove server 0, and its contents
1642             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1643             # Make happiness unsatisfiable
1644             c = self.g.clients[0]
1645             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1646             return c
1647         d.addCallback(_next)
1648         d.addCallback(lambda c:
1649             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1650                             "1 queries placed some shares",
1651                             c.upload, upload.Data("data" * 10000,
1652                                                   convergence="")))
1653         return d
1654
1655
1656     def test_upper_limit_on_readonly_queries(self):
1657         self.basedir = self.mktemp()
1658         d = self._setup_and_upload()
1659         def _then(ign):
1660             for i in xrange(1, 11):
1661                 self._add_server(server_number=i, readonly=True)
1662             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1663             c = self.g.clients[0]
1664             c.DEFAULT_ENCODING_PARAMETERS['k'] = 2
1665             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1666             c.DEFAULT_ENCODING_PARAMETERS['n'] = 4
1667             return c
1668         d.addCallback(_then)
1669         d.addCallback(lambda client:
1670             self.shouldFail(UploadUnhappinessError,
1671                             "test_upper_limit_on_readonly_queries",
1672                             "sent 8 queries to 8 servers",
1673                             client.upload,
1674                             upload.Data('data' * 10000, convergence="")))
1675         return d
1676
1677
1678     def test_exception_messages_during_server_selection(self):
1679         # server 1: read-only, no shares
1680         # server 2: read-only, no shares
1681         # server 3: read-only, no shares
1682         # server 4: read-only, no shares
1683         # server 5: read-only, no shares
1684         # This will fail, but we want to make sure that the log messages
1685         # are informative about why it has failed.
1686         self.basedir = self.mktemp()
1687         d = self._setup_and_upload()
1688         d.addCallback(lambda ign:
1689             self._add_server(server_number=1, readonly=True))
1690         d.addCallback(lambda ign:
1691             self._add_server(server_number=2, readonly=True))
1692         d.addCallback(lambda ign:
1693             self._add_server(server_number=3, readonly=True))
1694         d.addCallback(lambda ign:
1695             self._add_server(server_number=4, readonly=True))
1696         d.addCallback(lambda ign:
1697             self._add_server(server_number=5, readonly=True))
1698         d.addCallback(lambda ign:
1699             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1700         def _reset_encoding_parameters(ign, happy=4):
1701             client = self.g.clients[0]
1702             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1703             return client
1704         d.addCallback(_reset_encoding_parameters)
1705         d.addCallback(lambda client:
1706             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1707                             "placed 0 shares out of 10 "
1708                             "total (10 homeless), want to place shares on at "
1709                             "least 4 servers such that any 3 of them have "
1710                             "enough shares to recover the file, "
1711                             "sent 5 queries to 5 servers, 0 queries placed "
1712                             "some shares, 5 placed none "
1713                             "(of which 5 placed none due to the server being "
1714                             "full and 0 placed none due to an error)",
1715                             client.upload,
1716                             upload.Data("data" * 10000, convergence="")))
1717
1718
1719         # server 1: read-only, no shares
1720         # server 2: broken, no shares
1721         # server 3: read-only, no shares
1722         # server 4: read-only, no shares
1723         # server 5: read-only, no shares
1724         def _reset(ign):
1725             self.basedir = self.mktemp()
1726         d.addCallback(_reset)
1727         d.addCallback(lambda ign:
1728             self._setup_and_upload())
1729         d.addCallback(lambda ign:
1730             self._add_server(server_number=1, readonly=True))
1731         d.addCallback(lambda ign:
1732             self._add_server(server_number=2))
1733         def _break_server_2(ign):
1734             serverid = self.g.servers_by_number[2].my_nodeid
1735             self.g.break_server(serverid)
1736         d.addCallback(_break_server_2)
1737         d.addCallback(lambda ign:
1738             self._add_server(server_number=3, readonly=True))
1739         d.addCallback(lambda ign:
1740             self._add_server(server_number=4, readonly=True))
1741         d.addCallback(lambda ign:
1742             self._add_server(server_number=5, readonly=True))
1743         d.addCallback(lambda ign:
1744             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1745         d.addCallback(_reset_encoding_parameters)
1746         d.addCallback(lambda client:
1747             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1748                             "placed 0 shares out of 10 "
1749                             "total (10 homeless), want to place shares on at "
1750                             "least 4 servers such that any 3 of them have "
1751                             "enough shares to recover the file, "
1752                             "sent 5 queries to 5 servers, 0 queries placed "
1753                             "some shares, 5 placed none "
1754                             "(of which 4 placed none due to the server being "
1755                             "full and 1 placed none due to an error)",
1756                             client.upload,
1757                             upload.Data("data" * 10000, convergence="")))
1758         # server 0, server 1 = empty, accepting shares
1759         # This should place all of the shares, but still fail with happy=4.
1760         # We want to make sure that the exception message is worded correctly.
1761         d.addCallback(_reset)
1762         d.addCallback(lambda ign:
1763             self._setup_grid())
1764         d.addCallback(lambda ign:
1765             self._add_server(server_number=1))
1766         d.addCallback(_reset_encoding_parameters)
1767         d.addCallback(lambda client:
1768             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1769                             "shares could be placed or found on only 2 "
1770                             "server(s). We were asked to place shares on at "
1771                             "least 4 server(s) such that any 3 of them have "
1772                             "enough shares to recover the file.",
1773                             client.upload, upload.Data("data" * 10000,
1774                                                        convergence="")))
1775         # servers 0 - 4 = empty, accepting shares
1776         # This too should place all the shares, and this too should fail,
1777         # but since the effective happiness is more than the k encoding
1778         # parameter, it should trigger a different error message than the one
1779         # above.
1780         d.addCallback(_reset)
1781         d.addCallback(lambda ign:
1782             self._setup_grid())
1783         d.addCallback(lambda ign:
1784             self._add_server(server_number=1))
1785         d.addCallback(lambda ign:
1786             self._add_server(server_number=2))
1787         d.addCallback(lambda ign:
1788             self._add_server(server_number=3))
1789         d.addCallback(lambda ign:
1790             self._add_server(server_number=4))
1791         d.addCallback(_reset_encoding_parameters, happy=7)
1792         d.addCallback(lambda client:
1793             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1794                             "shares could be placed on only 5 server(s) such "
1795                             "that any 3 of them have enough shares to recover "
1796                             "the file, but we were asked to place shares on "
1797                             "at least 7 such servers.",
1798                             client.upload, upload.Data("data" * 10000,
1799                                                        convergence="")))
1800         # server 0: shares 0 - 9
1801         # server 1: share 0, read-only
1802         # server 2: share 0, read-only
1803         # server 3: share 0, read-only
1804         # This should place all of the shares, but fail with happy=4.
1805         # Since the number of servers with shares is more than the number
1806         # necessary to reconstitute the file, this will trigger a different
1807         # error message than either of those above.
1808         d.addCallback(_reset)
1809         d.addCallback(lambda ign:
1810             self._setup_and_upload())
1811         d.addCallback(lambda ign:
1812             self._add_server_with_share(server_number=1, share_number=0,
1813                                         readonly=True))
1814         d.addCallback(lambda ign:
1815             self._add_server_with_share(server_number=2, share_number=0,
1816                                         readonly=True))
1817         d.addCallback(lambda ign:
1818             self._add_server_with_share(server_number=3, share_number=0,
1819                                         readonly=True))
1820         d.addCallback(_reset_encoding_parameters, happy=7)
1821         d.addCallback(lambda client:
1822             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1823                             "shares could be placed or found on 4 server(s), "
1824                             "but they are not spread out evenly enough to "
1825                             "ensure that any 3 of these servers would have "
1826                             "enough shares to recover the file. We were asked "
1827                             "to place shares on at least 7 servers such that "
1828                             "any 3 of them have enough shares to recover the "
1829                             "file",
1830                             client.upload, upload.Data("data" * 10000,
1831                                                        convergence="")))
1832         return d
1833
1834
1835     def test_problem_layout_comment_187(self):
1836         # #778 comment 187 broke an initial attempt at a share
1837         # redistribution algorithm. This test is here to demonstrate the
1838         # breakage, and to test that subsequent algorithms don't also
1839         # break in the same way.
1840         self.basedir = self.mktemp()
1841         d = self._setup_and_upload(k=2, n=3)
1842
1843         # server 1: shares 0, 1, 2, readonly
1844         # server 2: share 0, readonly
1845         # server 3: share 0
1846         def _setup(ign):
1847             self._add_server_with_share(server_number=1, share_number=0,
1848                                         readonly=True)
1849             self._add_server_with_share(server_number=2, share_number=0,
1850                                         readonly=True)
1851             self._add_server_with_share(server_number=3, share_number=0)
1852             # Copy shares
1853             self._copy_share_to_server(1, 1)
1854             self._copy_share_to_server(2, 1)
1855             # Remove server 0
1856             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1857             client = self.g.clients[0]
1858             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
1859             return client
1860
1861         d.addCallback(_setup)
1862         d.addCallback(lambda client:
1863             client.upload(upload.Data("data" * 10000, convergence="")))
1864         d.addCallback(lambda ign:
1865             self.failUnless(self._has_happy_share_distribution()))
1866         return d
1867     test_problem_layout_comment_187.todo = "this isn't fixed yet"
1868
1869     def test_problem_layout_ticket_1118(self):
1870         # #1118 includes a report from a user who hit an assertion in
1871         # the upload code with this layout.
1872         self.basedir = self.mktemp()
1873         d = self._setup_and_upload(k=2, n=4)
1874
1875         # server 0: no shares
1876         # server 1: shares 0, 3
1877         # server 3: share 1
1878         # server 2: share 2
1879         # The order that they get queries is 0, 1, 3, 2
1880         def _setup(ign):
1881             self._add_server(server_number=0)
1882             self._add_server_with_share(server_number=1, share_number=0)
1883             self._add_server_with_share(server_number=2, share_number=2)
1884             self._add_server_with_share(server_number=3, share_number=1)
1885             # Copy shares
1886             self._copy_share_to_server(3, 1)
1887             storedir = self.get_serverdir(0)
1888             # remove the storedir, wiping out any existing shares
1889             shutil.rmtree(storedir)
1890             # create an empty storedir to replace the one we just removed
1891             os.mkdir(storedir)
1892             client = self.g.clients[0]
1893             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1894             return client
1895
1896         d.addCallback(_setup)
1897         # Note: actually it should succeed! See
1898         # test_problem_layout_ticket_1128. But ticket 1118 is just to
1899         # make it realize that it has failed, so if it raises
1900         # UploadUnhappinessError then we'll give it the green light
1901         # for now.
1902         d.addCallback(lambda ignored:
1903             self.shouldFail(UploadUnhappinessError,
1904                             "test_problem_layout_ticket_1118",
1905                             "",
1906                             self.g.clients[0].upload, upload.Data("data" * 10000,
1907                                                        convergence="")))
1908         return d
1909
1910     def test_problem_layout_ticket_1128(self):
1911         # #1118 includes a report from a user who hit an assertion in
1912         # the upload code with this layout.
1913         self.basedir = self.mktemp()
1914         d = self._setup_and_upload(k=2, n=4)
1915
1916         # server 0: no shares
1917         # server 1: shares 0, 3
1918         # server 3: share 1
1919         # server 2: share 2
1920         # The order that they get queries is 0, 1, 3, 2
1921         def _setup(ign):
1922             self._add_server(server_number=0)
1923             self._add_server_with_share(server_number=1, share_number=0)
1924             self._add_server_with_share(server_number=2, share_number=2)
1925             self._add_server_with_share(server_number=3, share_number=1)
1926             # Copy shares
1927             self._copy_share_to_server(3, 1)
1928             storedir = self.get_serverdir(0)
1929             # remove the storedir, wiping out any existing shares
1930             shutil.rmtree(storedir)
1931             # create an empty storedir to replace the one we just removed
1932             os.mkdir(storedir)
1933             client = self.g.clients[0]
1934             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1935             return client
1936
1937         d.addCallback(_setup)
1938         d.addCallback(lambda client:
1939                           client.upload(upload.Data("data" * 10000, convergence="")))
1940         d.addCallback(lambda ign:
1941             self.failUnless(self._has_happy_share_distribution()))
1942         return d
1943     test_problem_layout_ticket_1128.todo = "Invent a smarter uploader that uploads successfully in this case."
1944
1945     def test_upload_succeeds_with_some_homeless_shares(self):
1946         # If the upload is forced to stop trying to place shares before
1947         # it has placed (or otherwise accounted) for all of them, but it
1948         # has placed enough to satisfy the upload health criteria that
1949         # we're using, it should still succeed.
1950         self.basedir = self.mktemp()
1951         d = self._setup_and_upload()
1952         def _server_setup(ign):
1953             # Add four servers so that we have a layout like this:
1954             # server 1: share 0, read-only
1955             # server 2: share 1, read-only
1956             # server 3: share 2, read-only
1957             # server 4: share 3, read-only
1958             # If we set happy = 4, the upload will manage to satisfy
1959             # servers of happiness, but not place all of the shares; we
1960             # want to test that the upload is declared successful in
1961             # this case.
1962             self._add_server_with_share(server_number=1, share_number=0,
1963                                         readonly=True)
1964             self._add_server_with_share(server_number=2, share_number=1,
1965                                         readonly=True)
1966             self._add_server_with_share(server_number=3, share_number=2,
1967                                         readonly=True)
1968             self._add_server_with_share(server_number=4, share_number=3,
1969                                         readonly=True)
1970             # Remove server 0.
1971             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1972             # Set the client appropriately
1973             c = self.g.clients[0]
1974             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1975             return c
1976         d.addCallback(_server_setup)
1977         d.addCallback(lambda client:
1978             client.upload(upload.Data("data" * 10000, convergence="")))
1979         d.addCallback(lambda ign:
1980             self.failUnless(self._has_happy_share_distribution()))
1981         return d
1982
1983
1984     def test_uploader_skips_over_servers_with_only_one_share(self):
1985         # We want to make sure that the redistribution logic ignores
1986         # servers with only one share, since placing these shares
1987         # elsewhere will at best keep happiness the same as it was, and
1988         # at worst hurt it.
1989         self.basedir = self.mktemp()
1990         d = self._setup_and_upload()
1991         def _server_setup(ign):
1992             # Add some servers so that the upload will need to
1993             # redistribute, but will first pass over a couple of servers
1994             # that don't have enough shares to redistribute before
1995             # finding one that does have shares to redistribute.
1996             self._add_server_with_share(server_number=1, share_number=0)
1997             self._add_server_with_share(server_number=2, share_number=2)
1998             self._add_server_with_share(server_number=3, share_number=1)
1999             self._add_server_with_share(server_number=8, share_number=4)
2000             self._add_server_with_share(server_number=5, share_number=5)
2001             self._add_server_with_share(server_number=10, share_number=7)
2002             for i in xrange(4):
2003                 self._copy_share_to_server(i, 2)
2004             return self.g.clients[0]
2005         d.addCallback(_server_setup)
2006         d.addCallback(lambda client:
2007             client.upload(upload.Data("data" * 10000, convergence="")))
2008         d.addCallback(lambda ign:
2009             self.failUnless(self._has_happy_share_distribution()))
2010         return d
2011
2012
2013     def test_server_selector_bucket_abort(self):
2014         # If server selection for an upload fails due to an unhappy
2015         # layout, the server selection process should abort the buckets it
2016         # allocates before failing, so that the space can be re-used.
2017         self.basedir = self.mktemp()
2018         self.set_up_grid(num_servers=5)
2019
2020         # Try to upload a file with happy=7, which is unsatisfiable with
2021         # the current grid. This will fail, but should not take up any
2022         # space on the storage servers after it fails.
2023         client = self.g.clients[0]
2024         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2025         d = defer.succeed(None)
2026         d.addCallback(lambda ignored:
2027             self.shouldFail(UploadUnhappinessError,
2028                             "test_server_selection_bucket_abort",
2029                             "",
2030                             client.upload, upload.Data("data" * 10000,
2031                                                        convergence="")))
2032         # wait for the abort messages to get there.
2033         def _turn_barrier(res):
2034             return fireEventually(res)
2035         d.addCallback(_turn_barrier)
2036         def _then(ignored):
2037             for server in self.g.servers_by_number.values():
2038                 self.failUnlessEqual(server.allocated_size(), 0)
2039         d.addCallback(_then)
2040         return d
2041
2042
2043     def test_encoder_bucket_abort(self):
2044         # If enough servers die in the process of encoding and uploading
2045         # a file to make the layout unhappy, we should cancel the
2046         # newly-allocated buckets before dying.
2047         self.basedir = self.mktemp()
2048         self.set_up_grid(num_servers=4)
2049
2050         client = self.g.clients[0]
2051         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2052
2053         d = defer.succeed(None)
2054         d.addCallback(lambda ignored:
2055             self.shouldFail(UploadUnhappinessError,
2056                             "test_encoder_bucket_abort",
2057                             "",
2058                             self._do_upload_with_broken_servers, 1))
2059         def _turn_barrier(res):
2060             return fireEventually(res)
2061         d.addCallback(_turn_barrier)
2062         def _then(ignored):
2063             for server in self.g.servers_by_number.values():
2064                 self.failUnlessEqual(server.allocated_size(), 0)
2065         d.addCallback(_then)
2066         return d
2067
2068
2069     def _set_up_nodes_extra_config(self, clientdir):
2070         cfgfn = os.path.join(clientdir, "tahoe.cfg")
2071         oldcfg = open(cfgfn, "r").read()
2072         f = open(cfgfn, "wt")
2073         f.write(oldcfg)
2074         f.write("\n")
2075         f.write("[client]\n")
2076         f.write("shares.needed = 7\n")
2077         f.write("shares.total = 12\n")
2078         f.write("\n")
2079         f.close()
2080         return None
2081
2082 # TODO:
2083 #  upload with exactly 75 servers (shares_of_happiness)
2084 #  have a download fail
2085 #  cancel a download (need to implement more cancel stuff)
2086
2087 # from test_encode:
2088 # NoNetworkGrid, upload part of ciphertext, kill server, continue upload
2089 # check with Kevan, they want to live in test_upload, existing tests might cover
2090 #     def test_lost_one_shareholder(self): # these are upload-side tests
2091 #     def test_lost_one_shareholder_early(self):
2092 #     def test_lost_many_shareholders(self):
2093 #     def test_lost_all_shareholders(self):