]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
49df769f7c85d84f6fd7197aaa0c479ef49854a4
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1 # -*- coding: utf-8 -*-
2
3 import os, shutil
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.python.failure import Failure
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
9
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
14 from allmydata.util import log, base32
15 from allmydata.util.assertutil import precondition
16 from allmydata.util.deferredutil import DeferredListShouldSucceed
17 from allmydata.test.no_network import GridTestMixin
18 from allmydata.test.common_util import ShouldFailMixin
19 from allmydata.util.happinessutil import servers_of_happiness, \
20                                          shares_by_server, merge_servers
21 from allmydata.storage_client import StorageFarmBroker
22 from allmydata.storage.server import storage_index_to_dir
23
24 MiB = 1024*1024
25
26 def extract_uri(results):
27     return results.get_uri()
28
29 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
30 # have been due to an earlier test ERROR'ing out due to timeout, which seems
31 # to screw up subsequent tests.
32 timeout = 960
33
34 class Uploadable(unittest.TestCase):
35     def shouldEqual(self, data, expected):
36         self.failUnless(isinstance(data, list))
37         for e in data:
38             self.failUnless(isinstance(e, str))
39         s = "".join(data)
40         self.failUnlessEqual(s, expected)
41
42     def test_filehandle_random_key(self):
43         return self._test_filehandle(convergence=None)
44
45     def test_filehandle_convergent_encryption(self):
46         return self._test_filehandle(convergence="some convergence string")
47
48     def _test_filehandle(self, convergence):
49         s = StringIO("a"*41)
50         u = upload.FileHandle(s, convergence=convergence)
51         d = u.get_size()
52         d.addCallback(self.failUnlessEqual, 41)
53         d.addCallback(lambda res: u.read(1))
54         d.addCallback(self.shouldEqual, "a")
55         d.addCallback(lambda res: u.read(80))
56         d.addCallback(self.shouldEqual, "a"*40)
57         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
58         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
59         return d
60
61     def test_filename(self):
62         basedir = "upload/Uploadable/test_filename"
63         os.makedirs(basedir)
64         fn = os.path.join(basedir, "file")
65         f = open(fn, "w")
66         f.write("a"*41)
67         f.close()
68         u = upload.FileName(fn, convergence=None)
69         d = u.get_size()
70         d.addCallback(self.failUnlessEqual, 41)
71         d.addCallback(lambda res: u.read(1))
72         d.addCallback(self.shouldEqual, "a")
73         d.addCallback(lambda res: u.read(80))
74         d.addCallback(self.shouldEqual, "a"*40)
75         d.addCallback(lambda res: u.close())
76         return d
77
78     def test_data(self):
79         s = "a"*41
80         u = upload.Data(s, convergence=None)
81         d = u.get_size()
82         d.addCallback(self.failUnlessEqual, 41)
83         d.addCallback(lambda res: u.read(1))
84         d.addCallback(self.shouldEqual, "a")
85         d.addCallback(lambda res: u.read(80))
86         d.addCallback(self.shouldEqual, "a"*40)
87         d.addCallback(lambda res: u.close())
88         return d
89
90 class ServerError(Exception):
91     pass
92
93 class SetDEPMixin:
94     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
95         p = {"k": k,
96              "happy": happy,
97              "n": n,
98              "max_segment_size": max_segsize,
99              }
100         self.node.DEFAULT_ENCODING_PARAMETERS = p
101
102 class FakeStorageServer:
103     def __init__(self, mode):
104         self.mode = mode
105         self.allocated = []
106         self.queries = 0
107         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
108                          { "maximum-immutable-share-size": 2**32 },
109                          "application-version": str(allmydata.__full_version__),
110                          }
111         if mode == "small":
112             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
113                              { "maximum-immutable-share-size": 10 },
114                              "application-version": str(allmydata.__full_version__),
115                              }
116
117
118     def callRemote(self, methname, *args, **kwargs):
119         def _call():
120             meth = getattr(self, methname)
121             return meth(*args, **kwargs)
122         d = fireEventually()
123         d.addCallback(lambda res: _call())
124         return d
125
126     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
127                          sharenums, share_size, canary):
128         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
129         if self.mode == "first-fail":
130             if self.queries == 0:
131                 raise ServerError
132         if self.mode == "second-fail":
133             if self.queries == 1:
134                 raise ServerError
135         self.queries += 1
136         if self.mode == "full":
137             return (set(), {},)
138         elif self.mode == "already got them":
139             return (set(sharenums), {},)
140         else:
141             for shnum in sharenums:
142                 self.allocated.append( (storage_index, shnum) )
143             return (set(),
144                     dict([( shnum, FakeBucketWriter(share_size) )
145                           for shnum in sharenums]),
146                     )
147
148 class FakeBucketWriter:
149     # a diagnostic version of storageserver.BucketWriter
150     def __init__(self, size):
151         self.data = StringIO()
152         self.closed = False
153         self._size = size
154
155     def callRemote(self, methname, *args, **kwargs):
156         def _call():
157             meth = getattr(self, "remote_" + methname)
158             return meth(*args, **kwargs)
159         d = fireEventually()
160         d.addCallback(lambda res: _call())
161         return d
162
163
164     def callRemoteOnly(self, methname, *args, **kwargs):
165         d = self.callRemote(methname, *args, **kwargs)
166         del d # callRemoteOnly ignores this
167         return None
168
169
170     def remote_write(self, offset, data):
171         precondition(not self.closed)
172         precondition(offset >= 0)
173         precondition(offset+len(data) <= self._size,
174                      "offset=%d + data=%d > size=%d" %
175                      (offset, len(data), self._size))
176         self.data.seek(offset)
177         self.data.write(data)
178
179     def remote_close(self):
180         precondition(not self.closed)
181         self.closed = True
182
183     def remote_abort(self):
184         pass
185
186 class FakeClient:
187     DEFAULT_ENCODING_PARAMETERS = {"k":25,
188                                    "happy": 25,
189                                    "n": 100,
190                                    "max_segment_size": 1*MiB,
191                                    }
192     def __init__(self, mode="good", num_servers=50):
193         self.num_servers = num_servers
194         if type(mode) is str:
195             mode = dict([i,mode] for i in range(num_servers))
196         servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
197                     for fakeid in range(self.num_servers) ]
198         self.storage_broker = StorageFarmBroker(None, permute_peers=True)
199         for (serverid, rref) in servers:
200             ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
201                    "permutation-seed-base32": base32.b2a(serverid) }
202             self.storage_broker.test_add_rref(serverid, rref, ann)
203         self.last_servers = [s[1] for s in servers]
204
205     def log(self, *args, **kwargs):
206         pass
207     def get_encoding_parameters(self):
208         return self.DEFAULT_ENCODING_PARAMETERS
209     def get_storage_broker(self):
210         return self.storage_broker
211     _secret_holder = client.SecretHolder("lease secret", "convergence secret")
212
213 class GotTooFarError(Exception):
214     pass
215
216 class GiganticUploadable(upload.FileHandle):
217     def __init__(self, size):
218         self._size = size
219         self._fp = 0
220
221     def get_encryption_key(self):
222         return defer.succeed("\x00" * 16)
223     def get_size(self):
224         return defer.succeed(self._size)
225     def read(self, length):
226         left = self._size - self._fp
227         length = min(left, length)
228         self._fp += length
229         if self._fp > 1000000:
230             # terminate the test early.
231             raise GotTooFarError("we shouldn't be allowed to get this far")
232         return defer.succeed(["\x00" * length])
233     def close(self):
234         pass
235
236 DATA = """
237 Once upon a time, there was a beautiful princess named Buttercup. She lived
238 in a magical land where every file was stored securely among millions of
239 machines, and nobody ever worried about their data being lost ever again.
240 The End.
241 """
242 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
243
244 SIZE_ZERO = 0
245 SIZE_SMALL = 16
246 SIZE_LARGE = len(DATA)
247
248 def upload_data(uploader, data):
249     u = upload.Data(data, convergence=None)
250     return uploader.upload(u)
251 def upload_filename(uploader, filename):
252     u = upload.FileName(filename, convergence=None)
253     return uploader.upload(u)
254 def upload_filehandle(uploader, fh):
255     u = upload.FileHandle(fh, convergence=None)
256     return uploader.upload(u)
257
258 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
259     def setUp(self):
260         self.node = FakeClient(mode="good")
261         self.u = upload.Uploader()
262         self.u.running = True
263         self.u.parent = self.node
264
265     def _check_small(self, newuri, size):
266         u = uri.from_string(newuri)
267         self.failUnless(isinstance(u, uri.LiteralFileURI))
268         self.failUnlessEqual(len(u.data), size)
269
270     def _check_large(self, newuri, size):
271         u = uri.from_string(newuri)
272         self.failUnless(isinstance(u, uri.CHKFileURI))
273         self.failUnless(isinstance(u.get_storage_index(), str))
274         self.failUnlessEqual(len(u.get_storage_index()), 16)
275         self.failUnless(isinstance(u.key, str))
276         self.failUnlessEqual(len(u.key), 16)
277         self.failUnlessEqual(u.size, size)
278
279     def get_data(self, size):
280         return DATA[:size]
281
282     def test_too_large(self):
283         # we've removed the 4GiB share size limit (see ticket #346 for
284         # details), but still have an 8-byte field, so the limit is now
285         # 2**64, so make sure we reject files larger than that.
286         k = 3; happy = 7; n = 10
287         self.set_encoding_parameters(k, happy, n)
288         big = k*(2**64)
289         data1 = GiganticUploadable(big)
290         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
291                             "This file is too large to be uploaded (data_size)",
292                             self.u.upload, data1)
293         data2 = GiganticUploadable(big-3)
294         d.addCallback(lambda res:
295                       self.shouldFail(FileTooLargeError,
296                                       "test_too_large-data2",
297                                       "This file is too large to be uploaded (offsets)",
298                                       self.u.upload, data2))
299         # I don't know where the actual limit is.. it depends upon how large
300         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
301         return d
302
303     def test_data_zero(self):
304         data = self.get_data(SIZE_ZERO)
305         d = upload_data(self.u, data)
306         d.addCallback(extract_uri)
307         d.addCallback(self._check_small, SIZE_ZERO)
308         return d
309
310     def test_data_small(self):
311         data = self.get_data(SIZE_SMALL)
312         d = upload_data(self.u, data)
313         d.addCallback(extract_uri)
314         d.addCallback(self._check_small, SIZE_SMALL)
315         return d
316
317     def test_data_large(self):
318         data = self.get_data(SIZE_LARGE)
319         d = upload_data(self.u, data)
320         d.addCallback(extract_uri)
321         d.addCallback(self._check_large, SIZE_LARGE)
322         return d
323
324     def test_data_large_odd_segments(self):
325         data = self.get_data(SIZE_LARGE)
326         segsize = int(SIZE_LARGE / 2.5)
327         # we want 3 segments, since that's not a power of two
328         self.set_encoding_parameters(25, 25, 100, segsize)
329         d = upload_data(self.u, data)
330         d.addCallback(extract_uri)
331         d.addCallback(self._check_large, SIZE_LARGE)
332         return d
333
334     def test_filehandle_zero(self):
335         data = self.get_data(SIZE_ZERO)
336         d = upload_filehandle(self.u, StringIO(data))
337         d.addCallback(extract_uri)
338         d.addCallback(self._check_small, SIZE_ZERO)
339         return d
340
341     def test_filehandle_small(self):
342         data = self.get_data(SIZE_SMALL)
343         d = upload_filehandle(self.u, StringIO(data))
344         d.addCallback(extract_uri)
345         d.addCallback(self._check_small, SIZE_SMALL)
346         return d
347
348     def test_filehandle_large(self):
349         data = self.get_data(SIZE_LARGE)
350         d = upload_filehandle(self.u, StringIO(data))
351         d.addCallback(extract_uri)
352         d.addCallback(self._check_large, SIZE_LARGE)
353         return d
354
355     def test_filename_zero(self):
356         fn = "Uploader-test_filename_zero.data"
357         f = open(fn, "wb")
358         data = self.get_data(SIZE_ZERO)
359         f.write(data)
360         f.close()
361         d = upload_filename(self.u, fn)
362         d.addCallback(extract_uri)
363         d.addCallback(self._check_small, SIZE_ZERO)
364         return d
365
366     def test_filename_small(self):
367         fn = "Uploader-test_filename_small.data"
368         f = open(fn, "wb")
369         data = self.get_data(SIZE_SMALL)
370         f.write(data)
371         f.close()
372         d = upload_filename(self.u, fn)
373         d.addCallback(extract_uri)
374         d.addCallback(self._check_small, SIZE_SMALL)
375         return d
376
377     def test_filename_large(self):
378         fn = "Uploader-test_filename_large.data"
379         f = open(fn, "wb")
380         data = self.get_data(SIZE_LARGE)
381         f.write(data)
382         f.close()
383         d = upload_filename(self.u, fn)
384         d.addCallback(extract_uri)
385         d.addCallback(self._check_large, SIZE_LARGE)
386         return d
387
388 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
389     def make_node(self, mode, num_servers=10):
390         self.node = FakeClient(mode, num_servers)
391         self.u = upload.Uploader()
392         self.u.running = True
393         self.u.parent = self.node
394
395     def _check_large(self, newuri, size):
396         u = uri.from_string(newuri)
397         self.failUnless(isinstance(u, uri.CHKFileURI))
398         self.failUnless(isinstance(u.get_storage_index(), str))
399         self.failUnlessEqual(len(u.get_storage_index()), 16)
400         self.failUnless(isinstance(u.key, str))
401         self.failUnlessEqual(len(u.key), 16)
402         self.failUnlessEqual(u.size, size)
403
404     def test_first_error(self):
405         mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
406         self.make_node(mode)
407         self.set_encoding_parameters(k=25, happy=1, n=50)
408         d = upload_data(self.u, DATA)
409         d.addCallback(extract_uri)
410         d.addCallback(self._check_large, SIZE_LARGE)
411         return d
412
413     def test_first_error_all(self):
414         self.make_node("first-fail")
415         d = self.shouldFail(UploadUnhappinessError, "first_error_all",
416                             "server selection failed",
417                             upload_data, self.u, DATA)
418         def _check((f,)):
419             self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
420             # there should also be a 'last failure was' message
421             self.failUnlessIn("ServerError", str(f.value))
422         d.addCallback(_check)
423         return d
424
425     def test_second_error(self):
426         # we want to make sure we make it to a third pass. This means that
427         # the first pass was insufficient to place all shares, and at least
428         # one of second pass servers (other than the last one) accepted a
429         # share (so we'll believe that a third pass will be useful). (if
430         # everyone but the last server throws an error, then we'll send all
431         # the remaining shares to the last server at the end of the second
432         # pass, and if that succeeds, we won't make it to a third pass).
433         #
434         # we can achieve this 97.5% of the time by using 40 servers, having
435         # 39 of them fail on the second request, leaving only one to succeed
436         # on the second request. (we need to keep the number of servers low
437         # enough to ensure a second pass with 100 shares).
438         mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
439         self.make_node(mode, 40)
440         d = upload_data(self.u, DATA)
441         d.addCallback(extract_uri)
442         d.addCallback(self._check_large, SIZE_LARGE)
443         return d
444
445     def test_second_error_all(self):
446         self.make_node("second-fail")
447         d = self.shouldFail(UploadUnhappinessError, "second_error_all",
448                             "server selection failed",
449                             upload_data, self.u, DATA)
450         def _check((f,)):
451             self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
452             # there should also be a 'last failure was' message
453             self.failUnlessIn("ServerError", str(f.value))
454         d.addCallback(_check)
455         return d
456
457 class FullServer(unittest.TestCase):
458     def setUp(self):
459         self.node = FakeClient(mode="full")
460         self.u = upload.Uploader()
461         self.u.running = True
462         self.u.parent = self.node
463
464     def _should_fail(self, f):
465         self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
466
467     def test_data_large(self):
468         data = DATA
469         d = upload_data(self.u, data)
470         d.addBoth(self._should_fail)
471         return d
472
473 class ServerSelection(unittest.TestCase):
474
475     def make_client(self, num_servers=50):
476         self.node = FakeClient(mode="good", num_servers=num_servers)
477         self.u = upload.Uploader()
478         self.u.running = True
479         self.u.parent = self.node
480
481     def get_data(self, size):
482         return DATA[:size]
483
484     def _check_large(self, newuri, size):
485         u = uri.from_string(newuri)
486         self.failUnless(isinstance(u, uri.CHKFileURI))
487         self.failUnless(isinstance(u.get_storage_index(), str))
488         self.failUnlessEqual(len(u.get_storage_index()), 16)
489         self.failUnless(isinstance(u.key, str))
490         self.failUnlessEqual(len(u.key), 16)
491         self.failUnlessEqual(u.size, size)
492
493     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
494         p = {"k": k,
495              "happy": happy,
496              "n": n,
497              "max_segment_size": max_segsize,
498              }
499         self.node.DEFAULT_ENCODING_PARAMETERS = p
500
501     def test_one_each(self):
502         # if we have 50 shares, and there are 50 servers, and they all accept
503         # a share, we should get exactly one share per server
504
505         self.make_client()
506         data = self.get_data(SIZE_LARGE)
507         self.set_encoding_parameters(25, 30, 50)
508         d = upload_data(self.u, data)
509         d.addCallback(extract_uri)
510         d.addCallback(self._check_large, SIZE_LARGE)
511         def _check(res):
512             for s in self.node.last_servers:
513                 allocated = s.allocated
514                 self.failUnlessEqual(len(allocated), 1)
515                 self.failUnlessEqual(s.queries, 1)
516         d.addCallback(_check)
517         return d
518
519     def test_two_each(self):
520         # if we have 100 shares, and there are 50 servers, and they all
521         # accept all shares, we should get exactly two shares per server
522
523         self.make_client()
524         data = self.get_data(SIZE_LARGE)
525         # if there are 50 servers, then happy needs to be <= 50
526         self.set_encoding_parameters(50, 50, 100)
527         d = upload_data(self.u, data)
528         d.addCallback(extract_uri)
529         d.addCallback(self._check_large, SIZE_LARGE)
530         def _check(res):
531             for s in self.node.last_servers:
532                 allocated = s.allocated
533                 self.failUnlessEqual(len(allocated), 2)
534                 self.failUnlessEqual(s.queries, 2)
535         d.addCallback(_check)
536         return d
537
538     def test_one_each_plus_one_extra(self):
539         # if we have 51 shares, and there are 50 servers, then one server
540         # gets two shares and the rest get just one
541
542         self.make_client()
543         data = self.get_data(SIZE_LARGE)
544         self.set_encoding_parameters(24, 41, 51)
545         d = upload_data(self.u, data)
546         d.addCallback(extract_uri)
547         d.addCallback(self._check_large, SIZE_LARGE)
548         def _check(res):
549             got_one = []
550             got_two = []
551             for s in self.node.last_servers:
552                 allocated = s.allocated
553                 self.failUnless(len(allocated) in (1,2), len(allocated))
554                 if len(allocated) == 1:
555                     self.failUnlessEqual(s.queries, 1)
556                     got_one.append(s)
557                 else:
558                     self.failUnlessEqual(s.queries, 2)
559                     got_two.append(s)
560             self.failUnlessEqual(len(got_one), 49)
561             self.failUnlessEqual(len(got_two), 1)
562         d.addCallback(_check)
563         return d
564
565     def test_four_each(self):
566         # if we have 200 shares, and there are 50 servers, then each server
567         # gets 4 shares. The design goal is to accomplish this with only two
568         # queries per server.
569
570         self.make_client()
571         data = self.get_data(SIZE_LARGE)
572         # if there are 50 servers, then happy should be no more than 50 if we
573         # want this to work.
574         self.set_encoding_parameters(100, 50, 200)
575         d = upload_data(self.u, data)
576         d.addCallback(extract_uri)
577         d.addCallback(self._check_large, SIZE_LARGE)
578         def _check(res):
579             for s in self.node.last_servers:
580                 allocated = s.allocated
581                 self.failUnlessEqual(len(allocated), 4)
582                 self.failUnlessEqual(s.queries, 2)
583         d.addCallback(_check)
584         return d
585
586     def test_three_of_ten(self):
587         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
588         # 4+4+2
589
590         self.make_client(3)
591         data = self.get_data(SIZE_LARGE)
592         self.set_encoding_parameters(3, 3, 10)
593         d = upload_data(self.u, data)
594         d.addCallback(extract_uri)
595         d.addCallback(self._check_large, SIZE_LARGE)
596         def _check(res):
597             counts = {}
598             for s in self.node.last_servers:
599                 allocated = s.allocated
600                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
601             histogram = [counts.get(i, 0) for i in range(5)]
602             self.failUnlessEqual(histogram, [0,0,0,2,1])
603         d.addCallback(_check)
604         return d
605
606     def test_some_big_some_small(self):
607         # 10 shares, 20 servers, but half the servers don't support a
608         # share-size large enough for our file
609         mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
610         self.node = FakeClient(mode, num_servers=20)
611         self.u = upload.Uploader()
612         self.u.running = True
613         self.u.parent = self.node
614
615         data = self.get_data(SIZE_LARGE)
616         self.set_encoding_parameters(3, 5, 10)
617         d = upload_data(self.u, data)
618         d.addCallback(extract_uri)
619         d.addCallback(self._check_large, SIZE_LARGE)
620         def _check(res):
621             # we should have put one share each on the big servers, and zero
622             # shares on the small servers
623             total_allocated = 0
624             for p in self.node.last_servers:
625                 if p.mode == "good":
626                     self.failUnlessEqual(len(p.allocated), 1)
627                 elif p.mode == "small":
628                     self.failUnlessEqual(len(p.allocated), 0)
629                 total_allocated += len(p.allocated)
630             self.failUnlessEqual(total_allocated, 10)
631         d.addCallback(_check)
632         return d
633
634
635 class StorageIndex(unittest.TestCase):
636     def test_params_must_matter(self):
637         DATA = "I am some data"
638         u = upload.Data(DATA, convergence="")
639         eu = upload.EncryptAnUploadable(u)
640         d1 = eu.get_storage_index()
641
642         # CHK means the same data should encrypt the same way
643         u = upload.Data(DATA, convergence="")
644         eu = upload.EncryptAnUploadable(u)
645         d1a = eu.get_storage_index()
646
647         # but if we use a different convergence string it should be different
648         u = upload.Data(DATA, convergence="wheee!")
649         eu = upload.EncryptAnUploadable(u)
650         d1salt1 = eu.get_storage_index()
651
652         # and if we add yet a different convergence it should be different again
653         u = upload.Data(DATA, convergence="NOT wheee!")
654         eu = upload.EncryptAnUploadable(u)
655         d1salt2 = eu.get_storage_index()
656
657         # and if we use the first string again it should be the same as last time
658         u = upload.Data(DATA, convergence="wheee!")
659         eu = upload.EncryptAnUploadable(u)
660         d1salt1a = eu.get_storage_index()
661
662         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
663         u = upload.Data(DATA, convergence="")
664         u.encoding_param_k = u.default_encoding_param_k + 1
665         eu = upload.EncryptAnUploadable(u)
666         d2 = eu.get_storage_index()
667
668         # and if we use a random key, it should be different than the CHK
669         u = upload.Data(DATA, convergence=None)
670         eu = upload.EncryptAnUploadable(u)
671         d3 = eu.get_storage_index()
672         # and different from another instance
673         u = upload.Data(DATA, convergence=None)
674         eu = upload.EncryptAnUploadable(u)
675         d4 = eu.get_storage_index()
676
677         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
678         def _done(res):
679             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
680             self.failUnlessEqual(si1, si1a)
681             self.failIfEqual(si1, si2)
682             self.failIfEqual(si1, si3)
683             self.failIfEqual(si1, si4)
684             self.failIfEqual(si3, si4)
685             self.failIfEqual(si1salt1, si1)
686             self.failIfEqual(si1salt1, si1salt2)
687             self.failIfEqual(si1salt2, si1)
688             self.failUnlessEqual(si1salt1, si1salt1a)
689         d.addCallback(_done)
690         return d
691
692 # copied from python docs because itertools.combinations was added in
693 # python 2.6 and we support >= 2.4.
694 def combinations(iterable, r):
695     # combinations('ABCD', 2) --> AB AC AD BC BD CD
696     # combinations(range(4), 3) --> 012 013 023 123
697     pool = tuple(iterable)
698     n = len(pool)
699     if r > n:
700         return
701     indices = range(r)
702     yield tuple(pool[i] for i in indices)
703     while True:
704         for i in reversed(range(r)):
705             if indices[i] != i + n - r:
706                 break
707         else:
708             return
709         indices[i] += 1
710         for j in range(i+1, r):
711             indices[j] = indices[j-1] + 1
712         yield tuple(pool[i] for i in indices)
713
714 def is_happy_enough(servertoshnums, h, k):
715     """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """
716     if len(servertoshnums) < h:
717         return False
718     # print "servertoshnums: ", servertoshnums, h, k
719     for happysetcombo in combinations(servertoshnums.iterkeys(), h):
720         # print "happysetcombo: ", happysetcombo
721         for subsetcombo in combinations(happysetcombo, k):
722             shnums = reduce(set.union, [ servertoshnums[s] for s in subsetcombo ])
723             # print "subsetcombo: ", subsetcombo, ", shnums: ", shnums
724             if len(shnums) < k:
725                 # print "NOT HAAPP{Y", shnums, k
726                 return False
727     # print "HAAPP{Y"
728     return True
729
730 class FakeServerTracker:
731     def __init__(self, serverid, buckets):
732         self._serverid = serverid
733         self.buckets = buckets
734     def get_serverid(self):
735         return self._serverid
736
737 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
738     ShouldFailMixin):
739     def find_all_shares(self, unused=None):
740         """Locate shares on disk. Returns a dict that maps
741         server to set of sharenums.
742         """
743         assert self.g, "I tried to find a grid at self.g, but failed"
744         servertoshnums = {} # k: server, v: set(shnum)
745
746         for i, c in self.g.servers_by_number.iteritems():
747             for (dirp, dirns, fns) in os.walk(c.sharedir):
748                 for fn in fns:
749                     try:
750                         sharenum = int(fn)
751                     except TypeError:
752                         # Whoops, I guess that's not a share file then.
753                         pass
754                     else:
755                         servertoshnums.setdefault(i, set()).add(sharenum)
756
757         return servertoshnums
758
759     def _do_upload_with_broken_servers(self, servers_to_break):
760         """
761         I act like a normal upload, but before I send the results of
762         Tahoe2ServerSelector to the Encoder, I break the first
763         servers_to_break ServerTrackers in the upload_servers part of the
764         return result.
765         """
766         assert self.g, "I tried to find a grid at self.g, but failed"
767         broker = self.g.clients[0].storage_broker
768         sh     = self.g.clients[0]._secret_holder
769         data = upload.Data("data" * 10000, convergence="")
770         data.encoding_param_k = 3
771         data.encoding_param_happy = 4
772         data.encoding_param_n = 10
773         uploadable = upload.EncryptAnUploadable(data)
774         encoder = encode.Encoder()
775         encoder.set_encrypted_uploadable(uploadable)
776         status = upload.UploadStatus()
777         selector = upload.Tahoe2ServerSelector("dglev", "test", status)
778         storage_index = encoder.get_param("storage_index")
779         share_size = encoder.get_param("share_size")
780         block_size = encoder.get_param("block_size")
781         num_segments = encoder.get_param("num_segments")
782         d = selector.get_shareholders(broker, sh, storage_index,
783                                       share_size, block_size, num_segments,
784                                       10, 3, 4)
785         def _have_shareholders((upload_trackers, already_servers)):
786             assert servers_to_break <= len(upload_trackers)
787             for index in xrange(servers_to_break):
788                 tracker = list(upload_trackers)[index]
789                 for share in tracker.buckets.keys():
790                     tracker.buckets[share].abort()
791             buckets = {}
792             servermap = already_servers.copy()
793             for tracker in upload_trackers:
794                 buckets.update(tracker.buckets)
795                 for bucket in tracker.buckets:
796                     servermap.setdefault(bucket, set()).add(tracker.get_serverid())
797             encoder.set_shareholders(buckets, servermap)
798             d = encoder.start()
799             return d
800         d.addCallback(_have_shareholders)
801         return d
802
803     def _has_happy_share_distribution(self):
804         servertoshnums = self.find_all_shares()
805         k = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['k']
806         h = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['happy']
807         return is_happy_enough(servertoshnums, h, k)
808
809     def _add_server(self, server_number, readonly=False):
810         assert self.g, "I tried to find a grid at self.g, but failed"
811         ss = self.g.make_server(server_number, readonly)
812         log.msg("just created a server, number: %s => %s" % (server_number, ss,))
813         self.g.add_server(server_number, ss)
814
815     def _add_server_with_share(self, server_number, share_number=None,
816                                readonly=False):
817         self._add_server(server_number, readonly)
818         if share_number is not None:
819             self._copy_share_to_server(share_number, server_number)
820
821
822     def _copy_share_to_server(self, share_number, server_number):
823         ss = self.g.servers_by_number[server_number]
824         # Copy share i from the directory associated with the first
825         # storage server to the directory associated with this one.
826         assert self.g, "I tried to find a grid at self.g, but failed"
827         assert self.shares, "I tried to find shares at self.shares, but failed"
828         old_share_location = self.shares[share_number][2]
829         new_share_location = os.path.join(ss.storedir, "shares")
830         si = uri.from_string(self.uri).get_storage_index()
831         new_share_location = os.path.join(new_share_location,
832                                           storage_index_to_dir(si))
833         if not os.path.exists(new_share_location):
834             os.makedirs(new_share_location)
835         new_share_location = os.path.join(new_share_location,
836                                           str(share_number))
837         if old_share_location != new_share_location:
838             shutil.copy(old_share_location, new_share_location)
839         shares = self.find_uri_shares(self.uri)
840         # Make sure that the storage server has the share.
841         self.failUnless((share_number, ss.my_nodeid, new_share_location)
842                         in shares)
843
844     def _setup_grid(self):
845         """
846         I set up a NoNetworkGrid with a single server and client.
847         """
848         self.set_up_grid(num_clients=1, num_servers=1)
849
850     def _setup_and_upload(self, **kwargs):
851         """
852         I set up a NoNetworkGrid with a single server and client,
853         upload a file to it, store its uri in self.uri, and store its
854         sharedata in self.shares.
855         """
856         self._setup_grid()
857         client = self.g.clients[0]
858         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
859         if "n" in kwargs and "k" in kwargs:
860             client.DEFAULT_ENCODING_PARAMETERS['k'] = kwargs['k']
861             client.DEFAULT_ENCODING_PARAMETERS['n'] = kwargs['n']
862         data = upload.Data("data" * 10000, convergence="")
863         self.data = data
864         d = client.upload(data)
865         def _store_uri(ur):
866             self.uri = ur.get_uri()
867         d.addCallback(_store_uri)
868         d.addCallback(lambda ign:
869             self.find_uri_shares(self.uri))
870         def _store_shares(shares):
871             self.shares = shares
872         d.addCallback(_store_shares)
873         return d
874
875     def test_configure_parameters(self):
876         self.basedir = self.mktemp()
877         hooks = {0: self._set_up_nodes_extra_config}
878         self.set_up_grid(client_config_hooks=hooks)
879         c0 = self.g.clients[0]
880
881         DATA = "data" * 100
882         u = upload.Data(DATA, convergence="")
883         d = c0.upload(u)
884         d.addCallback(lambda ur: c0.create_node_from_uri(ur.get_uri()))
885         m = monitor.Monitor()
886         d.addCallback(lambda fn: fn.check(m))
887         def _check(cr):
888             self.failUnlessEqual(cr.get_encoding_needed(), 7)
889             self.failUnlessEqual(cr.get_encoding_expected(), 12)
890         d.addCallback(_check)
891         return d
892
893
894     def _setUp(self, ns):
895         # Used by test_happy_semantics and test_preexisting_share_behavior
896         # to set up the grid.
897         self.node = FakeClient(mode="good", num_servers=ns)
898         self.u = upload.Uploader()
899         self.u.running = True
900         self.u.parent = self.node
901
902
903     def test_happy_semantics(self):
904         self._setUp(2)
905         DATA = upload.Data("kittens" * 10000, convergence="")
906         # These parameters are unsatisfiable with only 2 servers.
907         self.set_encoding_parameters(k=3, happy=5, n=10)
908         d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
909                             "shares could be placed or found on only 2 "
910                             "server(s). We were asked to place shares on "
911                             "at least 5 server(s) such that any 3 of them "
912                             "have enough shares to recover the file",
913                             self.u.upload, DATA)
914         # Let's reset the client to have 10 servers
915         d.addCallback(lambda ign:
916             self._setUp(10))
917         # These parameters are satisfiable with 10 servers.
918         d.addCallback(lambda ign:
919             self.set_encoding_parameters(k=3, happy=5, n=10))
920         d.addCallback(lambda ign:
921             self.u.upload(DATA))
922         # Let's reset the client to have 7 servers
923         # (this is less than n, but more than h)
924         d.addCallback(lambda ign:
925             self._setUp(7))
926         # These parameters are satisfiable with 7 servers.
927         d.addCallback(lambda ign:
928             self.set_encoding_parameters(k=3, happy=5, n=10))
929         d.addCallback(lambda ign:
930             self.u.upload(DATA))
931         return d
932
933     def test_aborted_shares(self):
934         self.basedir = "upload/EncodingParameters/aborted_shares"
935         self.set_up_grid(num_servers=4)
936         c = self.g.clients[0]
937         DATA = upload.Data(100* "kittens", convergence="")
938         # These parameters are unsatisfiable with only 4 servers, but should
939         # work with 5, as long as the original 4 are not stuck in the open
940         # BucketWriter state (open() but not
941         parms = {"k":2, "happy":5, "n":5, "max_segment_size": 1*MiB}
942         c.DEFAULT_ENCODING_PARAMETERS = parms
943         d = self.shouldFail(UploadUnhappinessError, "test_aborted_shares",
944                             "shares could be placed on only 4 "
945                             "server(s) such that any 2 of them have enough "
946                             "shares to recover the file, but we were asked "
947                             "to place shares on at least 5 such servers",
948                             c.upload, DATA)
949         # now add the 5th server
950         d.addCallback(lambda ign: self._add_server(4, False))
951         # and this time the upload ought to succeed
952         d.addCallback(lambda ign: c.upload(DATA))
953         d.addCallback(lambda ign:
954             self.failUnless(self._has_happy_share_distribution()))
955         return d
956
957
958     def test_problem_layout_comment_52(self):
959         def _basedir():
960             self.basedir = self.mktemp()
961         _basedir()
962         # This scenario is at
963         # http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:52
964         #
965         # The scenario in comment:52 proposes that we have a layout
966         # like:
967         # server 0: shares 1 - 9
968         # server 1: share 0, read-only
969         # server 2: share 0, read-only
970         # server 3: share 0, read-only
971         # To get access to the shares, we will first upload to one
972         # server, which will then have shares 0 - 9. We'll then
973         # add three new servers, configure them to not accept any new
974         # shares, then write share 0 directly into the serverdir of each,
975         # and then remove share 0 from server 0 in the same way.
976         # Then each of servers 1 - 3 will report that they have share 0,
977         # and will not accept any new share, while server 0 will report that
978         # it has shares 1 - 9 and will accept new shares.
979         # We'll then set 'happy' = 4, and see that an upload fails
980         # (as it should)
981         d = self._setup_and_upload()
982         d.addCallback(lambda ign:
983             self._add_server_with_share(server_number=1, share_number=0,
984                                         readonly=True))
985         d.addCallback(lambda ign:
986             self._add_server_with_share(server_number=2, share_number=0,
987                                         readonly=True))
988         d.addCallback(lambda ign:
989             self._add_server_with_share(server_number=3, share_number=0,
990                                         readonly=True))
991         # Remove the first share from server 0.
992         def _remove_share_0_from_server_0():
993             share_location = self.shares[0][2]
994             os.remove(share_location)
995         d.addCallback(lambda ign:
996             _remove_share_0_from_server_0())
997         # Set happy = 4 in the client.
998         def _prepare():
999             client = self.g.clients[0]
1000             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1001             return client
1002         d.addCallback(lambda ign:
1003             _prepare())
1004         # Uploading data should fail
1005         d.addCallback(lambda client:
1006             self.shouldFail(UploadUnhappinessError,
1007                             "test_problem_layout_comment_52_test_1",
1008                             "shares could be placed or found on 4 server(s), "
1009                             "but they are not spread out evenly enough to "
1010                             "ensure that any 3 of these servers would have "
1011                             "enough shares to recover the file. "
1012                             "We were asked to place shares on at "
1013                             "least 4 servers such that any 3 of them have "
1014                             "enough shares to recover the file",
1015                             client.upload, upload.Data("data" * 10000,
1016                                                        convergence="")))
1017
1018         # Do comment:52, but like this:
1019         # server 2: empty
1020         # server 3: share 0, read-only
1021         # server 1: share 0, read-only
1022         # server 0: shares 0-9
1023         d.addCallback(lambda ign:
1024             _basedir())
1025         d.addCallback(lambda ign:
1026             self._setup_and_upload())
1027         d.addCallback(lambda ign:
1028             self._add_server(server_number=2))
1029         d.addCallback(lambda ign:
1030             self._add_server_with_share(server_number=3, share_number=0,
1031                                         readonly=True))
1032         d.addCallback(lambda ign:
1033             self._add_server_with_share(server_number=1, share_number=0,
1034                                         readonly=True))
1035         def _prepare2():
1036             client = self.g.clients[0]
1037             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1038             return client
1039         d.addCallback(lambda ign:
1040             _prepare2())
1041         d.addCallback(lambda client:
1042             self.shouldFail(UploadUnhappinessError,
1043                             "test_problem_layout_comment_52_test_2",
1044                             "shares could be placed on only 3 server(s) such "
1045                             "that any 3 of them have enough shares to recover "
1046                             "the file, but we were asked to place shares on "
1047                             "at least 4 such servers.",
1048                             client.upload, upload.Data("data" * 10000,
1049                                                        convergence="")))
1050         return d
1051
1052
1053     def test_problem_layout_comment_53(self):
1054         # This scenario is at
1055         # http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:53
1056         #
1057         # Set up the grid to have one server
1058         def _change_basedir(ign):
1059             self.basedir = self.mktemp()
1060         _change_basedir(None)
1061         # We start by uploading all of the shares to one server.
1062         # Next, we'll add three new servers to our NoNetworkGrid. We'll add
1063         # one share from our initial upload to each of these.
1064         # The counterintuitive ordering of the share numbers is to deal with
1065         # the permuting of these servers -- distributing the shares this
1066         # way ensures that the Tahoe2ServerSelector sees them in the order
1067         # described below.
1068         d = self._setup_and_upload()
1069         d.addCallback(lambda ign:
1070             self._add_server_with_share(server_number=1, share_number=2))
1071         d.addCallback(lambda ign:
1072             self._add_server_with_share(server_number=2, share_number=0))
1073         d.addCallback(lambda ign:
1074             self._add_server_with_share(server_number=3, share_number=1))
1075         # So, we now have the following layout:
1076         # server 0: shares 0 - 9
1077         # server 1: share 2
1078         # server 2: share 0
1079         # server 3: share 1
1080         # We change the 'happy' parameter in the client to 4.
1081         # The Tahoe2ServerSelector will see the servers permuted as:
1082         # 2, 3, 1, 0
1083         # Ideally, a reupload of our original data should work.
1084         def _reset_encoding_parameters(ign, happy=4):
1085             client = self.g.clients[0]
1086             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1087             return client
1088         d.addCallback(_reset_encoding_parameters)
1089         d.addCallback(lambda client:
1090             client.upload(upload.Data("data" * 10000, convergence="")))
1091         d.addCallback(lambda ign:
1092             self.failUnless(self._has_happy_share_distribution()))
1093
1094
1095         # This scenario is basically comment:53, but changed so that the
1096         # Tahoe2ServerSelector sees the server with all of the shares before
1097         # any of the other servers.
1098         # The layout is:
1099         # server 2: shares 0 - 9
1100         # server 3: share 0
1101         # server 1: share 1
1102         # server 4: share 2
1103         # The Tahoe2ServerSelector sees the servers permuted as:
1104         # 2, 3, 1, 4
1105         # Note that server 0 has been replaced by server 4; this makes it
1106         # easier to ensure that the last server seen by Tahoe2ServerSelector
1107         # has only one share.
1108         d.addCallback(_change_basedir)
1109         d.addCallback(lambda ign:
1110             self._setup_and_upload())
1111         d.addCallback(lambda ign:
1112             self._add_server_with_share(server_number=2, share_number=0))
1113         d.addCallback(lambda ign:
1114             self._add_server_with_share(server_number=3, share_number=1))
1115         d.addCallback(lambda ign:
1116             self._add_server_with_share(server_number=1, share_number=2))
1117         # Copy all of the other shares to server number 2
1118         def _copy_shares(ign):
1119             for i in xrange(0, 10):
1120                 self._copy_share_to_server(i, 2)
1121         d.addCallback(_copy_shares)
1122         # Remove the first server, and add a placeholder with share 0
1123         d.addCallback(lambda ign:
1124             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1125         d.addCallback(lambda ign:
1126             self._add_server_with_share(server_number=4, share_number=0))
1127         # Now try uploading.
1128         d.addCallback(_reset_encoding_parameters)
1129         d.addCallback(lambda client:
1130             client.upload(upload.Data("data" * 10000, convergence="")))
1131         d.addCallback(lambda ign:
1132             self.failUnless(self._has_happy_share_distribution()))
1133
1134
1135         # Try the same thing, but with empty servers after the first one
1136         # We want to make sure that Tahoe2ServerSelector will redistribute
1137         # shares as necessary, not simply discover an existing layout.
1138         # The layout is:
1139         # server 2: shares 0 - 9
1140         # server 3: empty
1141         # server 1: empty
1142         # server 4: empty
1143         d.addCallback(_change_basedir)
1144         d.addCallback(lambda ign:
1145             self._setup_and_upload())
1146         d.addCallback(lambda ign:
1147             self._add_server(server_number=2))
1148         d.addCallback(lambda ign:
1149             self._add_server(server_number=3))
1150         d.addCallback(lambda ign:
1151             self._add_server(server_number=1))
1152         d.addCallback(lambda ign:
1153             self._add_server(server_number=4))
1154         d.addCallback(_copy_shares)
1155         d.addCallback(lambda ign:
1156             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1157         d.addCallback(_reset_encoding_parameters)
1158         d.addCallback(lambda client:
1159             client.upload(upload.Data("data" * 10000, convergence="")))
1160         # Make sure that only as many shares as necessary to satisfy
1161         # servers of happiness were pushed.
1162         d.addCallback(lambda results:
1163             self.failUnlessEqual(results.get_pushed_shares(), 3))
1164         d.addCallback(lambda ign:
1165             self.failUnless(self._has_happy_share_distribution()))
1166         return d
1167
1168     def test_problem_layout_ticket_1124(self):
1169         self.basedir = self.mktemp()
1170         d = self._setup_and_upload(k=2, n=4)
1171
1172         # server 0: shares 0, 1, 2, 3
1173         # server 1: shares 0, 3
1174         # server 2: share 1
1175         # server 3: share 2
1176         # With this layout, an upload should just be satisfied that the current distribution is good enough, right?
1177         def _setup(ign):
1178             self._add_server_with_share(server_number=0, share_number=None)
1179             self._add_server_with_share(server_number=1, share_number=0)
1180             self._add_server_with_share(server_number=2, share_number=1)
1181             self._add_server_with_share(server_number=3, share_number=2)
1182             # Copy shares
1183             self._copy_share_to_server(3, 1)
1184             client = self.g.clients[0]
1185             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1186             return client
1187
1188         d.addCallback(_setup)
1189         d.addCallback(lambda client:
1190             client.upload(upload.Data("data" * 10000, convergence="")))
1191         d.addCallback(lambda ign:
1192             self.failUnless(self._has_happy_share_distribution()))
1193         return d
1194     test_problem_layout_ticket_1124.todo = "Fix this after 1.7.1 release."
1195
1196     def test_happiness_with_some_readonly_servers(self):
1197         # Try the following layout
1198         # server 2: shares 0-9
1199         # server 4: share 0, read-only
1200         # server 3: share 1, read-only
1201         # server 1: share 2, read-only
1202         self.basedir = self.mktemp()
1203         d = self._setup_and_upload()
1204         d.addCallback(lambda ign:
1205             self._add_server_with_share(server_number=2, share_number=0))
1206         d.addCallback(lambda ign:
1207             self._add_server_with_share(server_number=3, share_number=1,
1208                                         readonly=True))
1209         d.addCallback(lambda ign:
1210             self._add_server_with_share(server_number=1, share_number=2,
1211                                         readonly=True))
1212         # Copy all of the other shares to server number 2
1213         def _copy_shares(ign):
1214             for i in xrange(1, 10):
1215                 self._copy_share_to_server(i, 2)
1216         d.addCallback(_copy_shares)
1217         # Remove server 0, and add another in its place
1218         d.addCallback(lambda ign:
1219             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1220         d.addCallback(lambda ign:
1221             self._add_server_with_share(server_number=4, share_number=0,
1222                                         readonly=True))
1223         def _reset_encoding_parameters(ign, happy=4):
1224             client = self.g.clients[0]
1225             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1226             return client
1227         d.addCallback(_reset_encoding_parameters)
1228         d.addCallback(lambda client:
1229             client.upload(upload.Data("data" * 10000, convergence="")))
1230         d.addCallback(lambda ign:
1231             self.failUnless(self._has_happy_share_distribution()))
1232         return d
1233
1234
1235     def test_happiness_with_all_readonly_servers(self):
1236         # server 3: share 1, read-only
1237         # server 1: share 2, read-only
1238         # server 2: shares 0-9, read-only
1239         # server 4: share 0, read-only
1240         # The idea with this test is to make sure that the survey of
1241         # read-only servers doesn't undercount servers of happiness
1242         self.basedir = self.mktemp()
1243         d = self._setup_and_upload()
1244         d.addCallback(lambda ign:
1245             self._add_server_with_share(server_number=4, share_number=0,
1246                                         readonly=True))
1247         d.addCallback(lambda ign:
1248             self._add_server_with_share(server_number=3, share_number=1,
1249                                         readonly=True))
1250         d.addCallback(lambda ign:
1251             self._add_server_with_share(server_number=1, share_number=2,
1252                                         readonly=True))
1253         d.addCallback(lambda ign:
1254             self._add_server_with_share(server_number=2, share_number=0,
1255                                         readonly=True))
1256         def _copy_shares(ign):
1257             for i in xrange(1, 10):
1258                 self._copy_share_to_server(i, 2)
1259         d.addCallback(_copy_shares)
1260         d.addCallback(lambda ign:
1261             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1262         def _reset_encoding_parameters(ign, happy=4):
1263             client = self.g.clients[0]
1264             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1265             return client
1266         d.addCallback(_reset_encoding_parameters)
1267         d.addCallback(lambda client:
1268             client.upload(upload.Data("data" * 10000, convergence="")))
1269         d.addCallback(lambda ign:
1270             self.failUnless(self._has_happy_share_distribution()))
1271         return d
1272
1273
1274     def test_dropped_servers_in_encoder(self):
1275         # The Encoder does its own "servers_of_happiness" check if it
1276         # happens to lose a bucket during an upload (it assumes that
1277         # the layout presented to it satisfies "servers_of_happiness"
1278         # until a failure occurs)
1279         #
1280         # This test simulates an upload where servers break after server
1281         # selection, but before they are written to.
1282         def _set_basedir(ign=None):
1283             self.basedir = self.mktemp()
1284         _set_basedir()
1285         d = self._setup_and_upload();
1286         # Add 5 servers
1287         def _do_server_setup(ign):
1288             self._add_server(server_number=1)
1289             self._add_server(server_number=2)
1290             self._add_server(server_number=3)
1291             self._add_server(server_number=4)
1292             self._add_server(server_number=5)
1293         d.addCallback(_do_server_setup)
1294         # remove the original server
1295         # (necessary to ensure that the Tahoe2ServerSelector will distribute
1296         #  all the shares)
1297         def _remove_server(ign):
1298             server = self.g.servers_by_number[0]
1299             self.g.remove_server(server.my_nodeid)
1300         d.addCallback(_remove_server)
1301         # This should succeed; we still have 4 servers, and the
1302         # happiness of the upload is 4.
1303         d.addCallback(lambda ign:
1304             self._do_upload_with_broken_servers(1))
1305         # Now, do the same thing over again, but drop 2 servers instead
1306         # of 1. This should fail, because servers_of_happiness is 4 and
1307         # we can't satisfy that.
1308         d.addCallback(_set_basedir)
1309         d.addCallback(lambda ign:
1310             self._setup_and_upload())
1311         d.addCallback(_do_server_setup)
1312         d.addCallback(_remove_server)
1313         d.addCallback(lambda ign:
1314             self.shouldFail(UploadUnhappinessError,
1315                             "test_dropped_servers_in_encoder",
1316                             "shares could be placed on only 3 server(s) "
1317                             "such that any 3 of them have enough shares to "
1318                             "recover the file, but we were asked to place "
1319                             "shares on at least 4",
1320                             self._do_upload_with_broken_servers, 2))
1321         # Now do the same thing over again, but make some of the servers
1322         # readonly, break some of the ones that aren't, and make sure that
1323         # happiness accounting is preserved.
1324         d.addCallback(_set_basedir)
1325         d.addCallback(lambda ign:
1326             self._setup_and_upload())
1327         def _do_server_setup_2(ign):
1328             self._add_server(1)
1329             self._add_server(2)
1330             self._add_server(3)
1331             self._add_server_with_share(4, 7, readonly=True)
1332             self._add_server_with_share(5, 8, readonly=True)
1333         d.addCallback(_do_server_setup_2)
1334         d.addCallback(_remove_server)
1335         d.addCallback(lambda ign:
1336             self._do_upload_with_broken_servers(1))
1337         d.addCallback(_set_basedir)
1338         d.addCallback(lambda ign:
1339             self._setup_and_upload())
1340         d.addCallback(_do_server_setup_2)
1341         d.addCallback(_remove_server)
1342         d.addCallback(lambda ign:
1343             self.shouldFail(UploadUnhappinessError,
1344                             "test_dropped_servers_in_encoder",
1345                             "shares could be placed on only 3 server(s) "
1346                             "such that any 3 of them have enough shares to "
1347                             "recover the file, but we were asked to place "
1348                             "shares on at least 4",
1349                             self._do_upload_with_broken_servers, 2))
1350         return d
1351
1352
1353     def test_merge_servers(self):
1354         # merge_servers merges a list of upload_servers and a dict of
1355         # shareid -> serverid mappings.
1356         shares = {
1357                     1 : set(["server1"]),
1358                     2 : set(["server2"]),
1359                     3 : set(["server3"]),
1360                     4 : set(["server4", "server5"]),
1361                     5 : set(["server1", "server2"]),
1362                  }
1363         # if not provided with a upload_servers argument, it should just
1364         # return the first argument unchanged.
1365         self.failUnlessEqual(shares, merge_servers(shares, set([])))
1366         trackers = []
1367         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1368             t = FakeServerTracker(server, [i])
1369             trackers.append(t)
1370         expected = {
1371                     1 : set(["server1"]),
1372                     2 : set(["server2"]),
1373                     3 : set(["server3"]),
1374                     4 : set(["server4", "server5"]),
1375                     5 : set(["server1", "server2", "server5"]),
1376                     6 : set(["server6"]),
1377                     7 : set(["server7"]),
1378                     8 : set(["server8"]),
1379                    }
1380         self.failUnlessEqual(expected, merge_servers(shares, set(trackers)))
1381         shares2 = {}
1382         expected = {
1383                     5 : set(["server5"]),
1384                     6 : set(["server6"]),
1385                     7 : set(["server7"]),
1386                     8 : set(["server8"]),
1387                    }
1388         self.failUnlessEqual(expected, merge_servers(shares2, set(trackers)))
1389         shares3 = {}
1390         trackers = []
1391         expected = {}
1392         for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1393             shares3[i] = set([server])
1394             t = FakeServerTracker(server, [i])
1395             trackers.append(t)
1396             expected[i] = set([server])
1397         self.failUnlessEqual(expected, merge_servers(shares3, set(trackers)))
1398
1399
1400     def test_servers_of_happiness_utility_function(self):
1401         # These tests are concerned with the servers_of_happiness()
1402         # utility function, and its underlying matching algorithm. Other
1403         # aspects of the servers_of_happiness behavior are tested
1404         # elsehwere These tests exist to ensure that
1405         # servers_of_happiness doesn't under or overcount the happiness
1406         # value for given inputs.
1407
1408         # servers_of_happiness expects a dict of
1409         # shnum => set(serverids) as a preexisting shares argument.
1410         test1 = {
1411                  1 : set(["server1"]),
1412                  2 : set(["server2"]),
1413                  3 : set(["server3"]),
1414                  4 : set(["server4"])
1415                 }
1416         happy = servers_of_happiness(test1)
1417         self.failUnlessEqual(4, happy)
1418         test1[4] = set(["server1"])
1419         # We've added a duplicate server, so now servers_of_happiness
1420         # should be 3 instead of 4.
1421         happy = servers_of_happiness(test1)
1422         self.failUnlessEqual(3, happy)
1423         # The second argument of merge_servers should be a set of objects with
1424         # serverid and buckets as attributes. In actual use, these will be
1425         # ServerTracker instances, but for testing it is fine to make a
1426         # FakeServerTracker whose job is to hold those instance variables to
1427         # test that part.
1428         trackers = []
1429         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1430             t = FakeServerTracker(server, [i])
1431             trackers.append(t)
1432         # Recall that test1 is a server layout with servers_of_happiness
1433         # = 3.  Since there isn't any overlap between the shnum ->
1434         # set([serverid]) correspondences in test1 and those in trackers,
1435         # the result here should be 7.
1436         test2 = merge_servers(test1, set(trackers))
1437         happy = servers_of_happiness(test2)
1438         self.failUnlessEqual(7, happy)
1439         # Now add an overlapping server to trackers. This is redundant,
1440         # so it should not cause the previously reported happiness value
1441         # to change.
1442         t = FakeServerTracker("server1", [1])
1443         trackers.append(t)
1444         test2 = merge_servers(test1, set(trackers))
1445         happy = servers_of_happiness(test2)
1446         self.failUnlessEqual(7, happy)
1447         test = {}
1448         happy = servers_of_happiness(test)
1449         self.failUnlessEqual(0, happy)
1450         # Test a more substantial overlap between the trackers and the
1451         # existing assignments.
1452         test = {
1453             1 : set(['server1']),
1454             2 : set(['server2']),
1455             3 : set(['server3']),
1456             4 : set(['server4']),
1457         }
1458         trackers = []
1459         t = FakeServerTracker('server5', [4])
1460         trackers.append(t)
1461         t = FakeServerTracker('server6', [3, 5])
1462         trackers.append(t)
1463         # The value returned by servers_of_happiness is the size
1464         # of a maximum matching in the bipartite graph that
1465         # servers_of_happiness() makes between serverids and share
1466         # numbers. It should find something like this:
1467         # (server 1, share 1)
1468         # (server 2, share 2)
1469         # (server 3, share 3)
1470         # (server 5, share 4)
1471         # (server 6, share 5)
1472         #
1473         # and, since there are 5 edges in this matching, it should
1474         # return 5.
1475         test2 = merge_servers(test, set(trackers))
1476         happy = servers_of_happiness(test2)
1477         self.failUnlessEqual(5, happy)
1478         # Zooko's first puzzle:
1479         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1480         #
1481         # server 1: shares 0, 1
1482         # server 2: shares 1, 2
1483         # server 3: share 2
1484         #
1485         # This should yield happiness of 3.
1486         test = {
1487             0 : set(['server1']),
1488             1 : set(['server1', 'server2']),
1489             2 : set(['server2', 'server3']),
1490         }
1491         self.failUnlessEqual(3, servers_of_happiness(test))
1492         # Zooko's second puzzle:
1493         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1494         #
1495         # server 1: shares 0, 1
1496         # server 2: share 1
1497         #
1498         # This should yield happiness of 2.
1499         test = {
1500             0 : set(['server1']),
1501             1 : set(['server1', 'server2']),
1502         }
1503         self.failUnlessEqual(2, servers_of_happiness(test))
1504
1505
1506     def test_shares_by_server(self):
1507         test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1508         sbs = shares_by_server(test)
1509         self.failUnlessEqual(set([1]), sbs["server1"])
1510         self.failUnlessEqual(set([2]), sbs["server2"])
1511         self.failUnlessEqual(set([3]), sbs["server3"])
1512         self.failUnlessEqual(set([4]), sbs["server4"])
1513         test1 = {
1514                     1 : set(["server1"]),
1515                     2 : set(["server1"]),
1516                     3 : set(["server1"]),
1517                     4 : set(["server2"]),
1518                     5 : set(["server2"])
1519                 }
1520         sbs = shares_by_server(test1)
1521         self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1522         self.failUnlessEqual(set([4, 5]), sbs["server2"])
1523         # This should fail unless the serverid part of the mapping is a set
1524         test2 = {1: "server1"}
1525         self.shouldFail(AssertionError,
1526                        "test_shares_by_server",
1527                        "",
1528                        shares_by_server, test2)
1529
1530
1531     def test_existing_share_detection(self):
1532         self.basedir = self.mktemp()
1533         d = self._setup_and_upload()
1534         # Our final setup should look like this:
1535         # server 1: shares 0 - 9, read-only
1536         # server 2: empty
1537         # server 3: empty
1538         # server 4: empty
1539         # The purpose of this test is to make sure that the server selector
1540         # knows about the shares on server 1, even though it is read-only.
1541         # It used to simply filter these out, which would cause the test
1542         # to fail when servers_of_happiness = 4.
1543         d.addCallback(lambda ign:
1544             self._add_server_with_share(1, 0, True))
1545         d.addCallback(lambda ign:
1546             self._add_server(2))
1547         d.addCallback(lambda ign:
1548             self._add_server(3))
1549         d.addCallback(lambda ign:
1550             self._add_server(4))
1551         def _copy_shares(ign):
1552             for i in xrange(1, 10):
1553                 self._copy_share_to_server(i, 1)
1554         d.addCallback(_copy_shares)
1555         d.addCallback(lambda ign:
1556             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1557         def _prepare_client(ign):
1558             client = self.g.clients[0]
1559             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1560             return client
1561         d.addCallback(_prepare_client)
1562         d.addCallback(lambda client:
1563             client.upload(upload.Data("data" * 10000, convergence="")))
1564         d.addCallback(lambda ign:
1565             self.failUnless(self._has_happy_share_distribution()))
1566         return d
1567
1568
1569     def test_query_counting(self):
1570         # If server selection fails, Tahoe2ServerSelector prints out a lot
1571         # of helpful diagnostic information, including query stats.
1572         # This test helps make sure that that information is accurate.
1573         self.basedir = self.mktemp()
1574         d = self._setup_and_upload()
1575         def _setup(ign):
1576             for i in xrange(1, 11):
1577                 self._add_server(server_number=i)
1578             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1579             c = self.g.clients[0]
1580             # We set happy to an unsatisfiable value so that we can check the
1581             # counting in the exception message. The same progress message
1582             # is also used when the upload is successful, but in that case it
1583             # only gets written to a log, so we can't see what it says.
1584             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1585             return c
1586         d.addCallback(_setup)
1587         d.addCallback(lambda c:
1588             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1589                             "10 queries placed some shares",
1590                             c.upload, upload.Data("data" * 10000,
1591                                                   convergence="")))
1592         # Now try with some readonly servers. We want to make sure that
1593         # the readonly server share discovery phase is counted correctly.
1594         def _reset(ign):
1595             self.basedir = self.mktemp()
1596             self.g = None
1597         d.addCallback(_reset)
1598         d.addCallback(lambda ign:
1599             self._setup_and_upload())
1600         def _then(ign):
1601             for i in xrange(1, 11):
1602                 self._add_server(server_number=i)
1603             self._add_server(server_number=11, readonly=True)
1604             self._add_server(server_number=12, readonly=True)
1605             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1606             c = self.g.clients[0]
1607             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1608             return c
1609         d.addCallback(_then)
1610         d.addCallback(lambda c:
1611             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1612                             "2 placed none (of which 2 placed none due to "
1613                             "the server being full",
1614                             c.upload, upload.Data("data" * 10000,
1615                                                   convergence="")))
1616         # Now try the case where the upload process finds a bunch of the
1617         # shares that it wants to place on the first server, including
1618         # the one that it wanted to allocate there. Though no shares will
1619         # be allocated in this request, it should still be called
1620         # productive, since it caused some homeless shares to be
1621         # removed.
1622         d.addCallback(_reset)
1623         d.addCallback(lambda ign:
1624             self._setup_and_upload())
1625
1626         def _next(ign):
1627             for i in xrange(1, 11):
1628                 self._add_server(server_number=i)
1629             # Copy all of the shares to server 9, since that will be
1630             # the first one that the selector sees.
1631             for i in xrange(10):
1632                 self._copy_share_to_server(i, 9)
1633             # Remove server 0, and its contents
1634             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1635             # Make happiness unsatisfiable
1636             c = self.g.clients[0]
1637             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1638             return c
1639         d.addCallback(_next)
1640         d.addCallback(lambda c:
1641             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1642                             "1 queries placed some shares",
1643                             c.upload, upload.Data("data" * 10000,
1644                                                   convergence="")))
1645         return d
1646
1647
1648     def test_upper_limit_on_readonly_queries(self):
1649         self.basedir = self.mktemp()
1650         d = self._setup_and_upload()
1651         def _then(ign):
1652             for i in xrange(1, 11):
1653                 self._add_server(server_number=i, readonly=True)
1654             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1655             c = self.g.clients[0]
1656             c.DEFAULT_ENCODING_PARAMETERS['k'] = 2
1657             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1658             c.DEFAULT_ENCODING_PARAMETERS['n'] = 4
1659             return c
1660         d.addCallback(_then)
1661         d.addCallback(lambda client:
1662             self.shouldFail(UploadUnhappinessError,
1663                             "test_upper_limit_on_readonly_queries",
1664                             "sent 8 queries to 8 servers",
1665                             client.upload,
1666                             upload.Data('data' * 10000, convergence="")))
1667         return d
1668
1669
1670     def test_exception_messages_during_server_selection(self):
1671         # server 1: read-only, no shares
1672         # server 2: read-only, no shares
1673         # server 3: read-only, no shares
1674         # server 4: read-only, no shares
1675         # server 5: read-only, no shares
1676         # This will fail, but we want to make sure that the log messages
1677         # are informative about why it has failed.
1678         self.basedir = self.mktemp()
1679         d = self._setup_and_upload()
1680         d.addCallback(lambda ign:
1681             self._add_server(server_number=1, readonly=True))
1682         d.addCallback(lambda ign:
1683             self._add_server(server_number=2, readonly=True))
1684         d.addCallback(lambda ign:
1685             self._add_server(server_number=3, readonly=True))
1686         d.addCallback(lambda ign:
1687             self._add_server(server_number=4, readonly=True))
1688         d.addCallback(lambda ign:
1689             self._add_server(server_number=5, readonly=True))
1690         d.addCallback(lambda ign:
1691             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1692         def _reset_encoding_parameters(ign, happy=4):
1693             client = self.g.clients[0]
1694             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1695             return client
1696         d.addCallback(_reset_encoding_parameters)
1697         d.addCallback(lambda client:
1698             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1699                             "placed 0 shares out of 10 "
1700                             "total (10 homeless), want to place shares on at "
1701                             "least 4 servers such that any 3 of them have "
1702                             "enough shares to recover the file, "
1703                             "sent 5 queries to 5 servers, 0 queries placed "
1704                             "some shares, 5 placed none "
1705                             "(of which 5 placed none due to the server being "
1706                             "full and 0 placed none due to an error)",
1707                             client.upload,
1708                             upload.Data("data" * 10000, convergence="")))
1709
1710
1711         # server 1: read-only, no shares
1712         # server 2: broken, no shares
1713         # server 3: read-only, no shares
1714         # server 4: read-only, no shares
1715         # server 5: read-only, no shares
1716         def _reset(ign):
1717             self.basedir = self.mktemp()
1718         d.addCallback(_reset)
1719         d.addCallback(lambda ign:
1720             self._setup_and_upload())
1721         d.addCallback(lambda ign:
1722             self._add_server(server_number=1, readonly=True))
1723         d.addCallback(lambda ign:
1724             self._add_server(server_number=2))
1725         def _break_server_2(ign):
1726             serverid = self.g.servers_by_number[2].my_nodeid
1727             self.g.break_server(serverid)
1728         d.addCallback(_break_server_2)
1729         d.addCallback(lambda ign:
1730             self._add_server(server_number=3, readonly=True))
1731         d.addCallback(lambda ign:
1732             self._add_server(server_number=4, readonly=True))
1733         d.addCallback(lambda ign:
1734             self._add_server(server_number=5, readonly=True))
1735         d.addCallback(lambda ign:
1736             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1737         d.addCallback(_reset_encoding_parameters)
1738         d.addCallback(lambda client:
1739             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1740                             "placed 0 shares out of 10 "
1741                             "total (10 homeless), want to place shares on at "
1742                             "least 4 servers such that any 3 of them have "
1743                             "enough shares to recover the file, "
1744                             "sent 5 queries to 5 servers, 0 queries placed "
1745                             "some shares, 5 placed none "
1746                             "(of which 4 placed none due to the server being "
1747                             "full and 1 placed none due to an error)",
1748                             client.upload,
1749                             upload.Data("data" * 10000, convergence="")))
1750         # server 0, server 1 = empty, accepting shares
1751         # This should place all of the shares, but still fail with happy=4.
1752         # We want to make sure that the exception message is worded correctly.
1753         d.addCallback(_reset)
1754         d.addCallback(lambda ign:
1755             self._setup_grid())
1756         d.addCallback(lambda ign:
1757             self._add_server(server_number=1))
1758         d.addCallback(_reset_encoding_parameters)
1759         d.addCallback(lambda client:
1760             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1761                             "shares could be placed or found on only 2 "
1762                             "server(s). We were asked to place shares on at "
1763                             "least 4 server(s) such that any 3 of them have "
1764                             "enough shares to recover the file.",
1765                             client.upload, upload.Data("data" * 10000,
1766                                                        convergence="")))
1767         # servers 0 - 4 = empty, accepting shares
1768         # This too should place all the shares, and this too should fail,
1769         # but since the effective happiness is more than the k encoding
1770         # parameter, it should trigger a different error message than the one
1771         # above.
1772         d.addCallback(_reset)
1773         d.addCallback(lambda ign:
1774             self._setup_grid())
1775         d.addCallback(lambda ign:
1776             self._add_server(server_number=1))
1777         d.addCallback(lambda ign:
1778             self._add_server(server_number=2))
1779         d.addCallback(lambda ign:
1780             self._add_server(server_number=3))
1781         d.addCallback(lambda ign:
1782             self._add_server(server_number=4))
1783         d.addCallback(_reset_encoding_parameters, happy=7)
1784         d.addCallback(lambda client:
1785             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1786                             "shares could be placed on only 5 server(s) such "
1787                             "that any 3 of them have enough shares to recover "
1788                             "the file, but we were asked to place shares on "
1789                             "at least 7 such servers.",
1790                             client.upload, upload.Data("data" * 10000,
1791                                                        convergence="")))
1792         # server 0: shares 0 - 9
1793         # server 1: share 0, read-only
1794         # server 2: share 0, read-only
1795         # server 3: share 0, read-only
1796         # This should place all of the shares, but fail with happy=4.
1797         # Since the number of servers with shares is more than the number
1798         # necessary to reconstitute the file, this will trigger a different
1799         # error message than either of those above.
1800         d.addCallback(_reset)
1801         d.addCallback(lambda ign:
1802             self._setup_and_upload())
1803         d.addCallback(lambda ign:
1804             self._add_server_with_share(server_number=1, share_number=0,
1805                                         readonly=True))
1806         d.addCallback(lambda ign:
1807             self._add_server_with_share(server_number=2, share_number=0,
1808                                         readonly=True))
1809         d.addCallback(lambda ign:
1810             self._add_server_with_share(server_number=3, share_number=0,
1811                                         readonly=True))
1812         d.addCallback(_reset_encoding_parameters, happy=7)
1813         d.addCallback(lambda client:
1814             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1815                             "shares could be placed or found on 4 server(s), "
1816                             "but they are not spread out evenly enough to "
1817                             "ensure that any 3 of these servers would have "
1818                             "enough shares to recover the file. We were asked "
1819                             "to place shares on at least 7 servers such that "
1820                             "any 3 of them have enough shares to recover the "
1821                             "file",
1822                             client.upload, upload.Data("data" * 10000,
1823                                                        convergence="")))
1824         return d
1825
1826
1827     def test_problem_layout_comment_187(self):
1828         # #778 comment 187 broke an initial attempt at a share
1829         # redistribution algorithm. This test is here to demonstrate the
1830         # breakage, and to test that subsequent algorithms don't also
1831         # break in the same way.
1832         self.basedir = self.mktemp()
1833         d = self._setup_and_upload(k=2, n=3)
1834
1835         # server 1: shares 0, 1, 2, readonly
1836         # server 2: share 0, readonly
1837         # server 3: share 0
1838         def _setup(ign):
1839             self._add_server_with_share(server_number=1, share_number=0,
1840                                         readonly=True)
1841             self._add_server_with_share(server_number=2, share_number=0,
1842                                         readonly=True)
1843             self._add_server_with_share(server_number=3, share_number=0)
1844             # Copy shares
1845             self._copy_share_to_server(1, 1)
1846             self._copy_share_to_server(2, 1)
1847             # Remove server 0
1848             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1849             client = self.g.clients[0]
1850             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
1851             return client
1852
1853         d.addCallback(_setup)
1854         d.addCallback(lambda client:
1855             client.upload(upload.Data("data" * 10000, convergence="")))
1856         d.addCallback(lambda ign:
1857             self.failUnless(self._has_happy_share_distribution()))
1858         return d
1859     test_problem_layout_comment_187.todo = "this isn't fixed yet"
1860
1861     def test_problem_layout_ticket_1118(self):
1862         # #1118 includes a report from a user who hit an assertion in
1863         # the upload code with this layout.
1864         self.basedir = self.mktemp()
1865         d = self._setup_and_upload(k=2, n=4)
1866
1867         # server 0: no shares
1868         # server 1: shares 0, 3
1869         # server 3: share 1
1870         # server 2: share 2
1871         # The order that they get queries is 0, 1, 3, 2
1872         def _setup(ign):
1873             self._add_server(server_number=0)
1874             self._add_server_with_share(server_number=1, share_number=0)
1875             self._add_server_with_share(server_number=2, share_number=2)
1876             self._add_server_with_share(server_number=3, share_number=1)
1877             # Copy shares
1878             self._copy_share_to_server(3, 1)
1879             storedir = self.get_serverdir(0)
1880             # remove the storedir, wiping out any existing shares
1881             shutil.rmtree(storedir)
1882             # create an empty storedir to replace the one we just removed
1883             os.mkdir(storedir)
1884             client = self.g.clients[0]
1885             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1886             return client
1887
1888         d.addCallback(_setup)
1889         # Note: actually it should succeed! See
1890         # test_problem_layout_ticket_1128. But ticket 1118 is just to
1891         # make it realize that it has failed, so if it raises
1892         # UploadUnhappinessError then we'll give it the green light
1893         # for now.
1894         d.addCallback(lambda ignored:
1895             self.shouldFail(UploadUnhappinessError,
1896                             "test_problem_layout_ticket_1118",
1897                             "",
1898                             self.g.clients[0].upload, upload.Data("data" * 10000,
1899                                                        convergence="")))
1900         return d
1901
1902     def test_problem_layout_ticket_1128(self):
1903         # #1118 includes a report from a user who hit an assertion in
1904         # the upload code with this layout.
1905         self.basedir = self.mktemp()
1906         d = self._setup_and_upload(k=2, n=4)
1907
1908         # server 0: no shares
1909         # server 1: shares 0, 3
1910         # server 3: share 1
1911         # server 2: share 2
1912         # The order that they get queries is 0, 1, 3, 2
1913         def _setup(ign):
1914             self._add_server(server_number=0)
1915             self._add_server_with_share(server_number=1, share_number=0)
1916             self._add_server_with_share(server_number=2, share_number=2)
1917             self._add_server_with_share(server_number=3, share_number=1)
1918             # Copy shares
1919             self._copy_share_to_server(3, 1)
1920             storedir = self.get_serverdir(0)
1921             # remove the storedir, wiping out any existing shares
1922             shutil.rmtree(storedir)
1923             # create an empty storedir to replace the one we just removed
1924             os.mkdir(storedir)
1925             client = self.g.clients[0]
1926             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1927             return client
1928
1929         d.addCallback(_setup)
1930         d.addCallback(lambda client:
1931                           client.upload(upload.Data("data" * 10000, convergence="")))
1932         d.addCallback(lambda ign:
1933             self.failUnless(self._has_happy_share_distribution()))
1934         return d
1935     test_problem_layout_ticket_1128.todo = "Invent a smarter uploader that uploads successfully in this case."
1936
1937     def test_upload_succeeds_with_some_homeless_shares(self):
1938         # If the upload is forced to stop trying to place shares before
1939         # it has placed (or otherwise accounted) for all of them, but it
1940         # has placed enough to satisfy the upload health criteria that
1941         # we're using, it should still succeed.
1942         self.basedir = self.mktemp()
1943         d = self._setup_and_upload()
1944         def _server_setup(ign):
1945             # Add four servers so that we have a layout like this:
1946             # server 1: share 0, read-only
1947             # server 2: share 1, read-only
1948             # server 3: share 2, read-only
1949             # server 4: share 3, read-only
1950             # If we set happy = 4, the upload will manage to satisfy
1951             # servers of happiness, but not place all of the shares; we
1952             # want to test that the upload is declared successful in
1953             # this case.
1954             self._add_server_with_share(server_number=1, share_number=0,
1955                                         readonly=True)
1956             self._add_server_with_share(server_number=2, share_number=1,
1957                                         readonly=True)
1958             self._add_server_with_share(server_number=3, share_number=2,
1959                                         readonly=True)
1960             self._add_server_with_share(server_number=4, share_number=3,
1961                                         readonly=True)
1962             # Remove server 0.
1963             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1964             # Set the client appropriately
1965             c = self.g.clients[0]
1966             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1967             return c
1968         d.addCallback(_server_setup)
1969         d.addCallback(lambda client:
1970             client.upload(upload.Data("data" * 10000, convergence="")))
1971         d.addCallback(lambda ign:
1972             self.failUnless(self._has_happy_share_distribution()))
1973         return d
1974
1975
1976     def test_uploader_skips_over_servers_with_only_one_share(self):
1977         # We want to make sure that the redistribution logic ignores
1978         # servers with only one share, since placing these shares
1979         # elsewhere will at best keep happiness the same as it was, and
1980         # at worst hurt it.
1981         self.basedir = self.mktemp()
1982         d = self._setup_and_upload()
1983         def _server_setup(ign):
1984             # Add some servers so that the upload will need to
1985             # redistribute, but will first pass over a couple of servers
1986             # that don't have enough shares to redistribute before
1987             # finding one that does have shares to redistribute.
1988             self._add_server_with_share(server_number=1, share_number=0)
1989             self._add_server_with_share(server_number=2, share_number=2)
1990             self._add_server_with_share(server_number=3, share_number=1)
1991             self._add_server_with_share(server_number=8, share_number=4)
1992             self._add_server_with_share(server_number=5, share_number=5)
1993             self._add_server_with_share(server_number=10, share_number=7)
1994             for i in xrange(4):
1995                 self._copy_share_to_server(i, 2)
1996             return self.g.clients[0]
1997         d.addCallback(_server_setup)
1998         d.addCallback(lambda client:
1999             client.upload(upload.Data("data" * 10000, convergence="")))
2000         d.addCallback(lambda ign:
2001             self.failUnless(self._has_happy_share_distribution()))
2002         return d
2003
2004
2005     def test_server_selector_bucket_abort(self):
2006         # If server selection for an upload fails due to an unhappy
2007         # layout, the server selection process should abort the buckets it
2008         # allocates before failing, so that the space can be re-used.
2009         self.basedir = self.mktemp()
2010         self.set_up_grid(num_servers=5)
2011
2012         # Try to upload a file with happy=7, which is unsatisfiable with
2013         # the current grid. This will fail, but should not take up any
2014         # space on the storage servers after it fails.
2015         client = self.g.clients[0]
2016         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2017         d = defer.succeed(None)
2018         d.addCallback(lambda ignored:
2019             self.shouldFail(UploadUnhappinessError,
2020                             "test_server_selection_bucket_abort",
2021                             "",
2022                             client.upload, upload.Data("data" * 10000,
2023                                                        convergence="")))
2024         # wait for the abort messages to get there.
2025         def _turn_barrier(res):
2026             return fireEventually(res)
2027         d.addCallback(_turn_barrier)
2028         def _then(ignored):
2029             for server in self.g.servers_by_number.values():
2030                 self.failUnlessEqual(server.allocated_size(), 0)
2031         d.addCallback(_then)
2032         return d
2033
2034
2035     def test_encoder_bucket_abort(self):
2036         # If enough servers die in the process of encoding and uploading
2037         # a file to make the layout unhappy, we should cancel the
2038         # newly-allocated buckets before dying.
2039         self.basedir = self.mktemp()
2040         self.set_up_grid(num_servers=4)
2041
2042         client = self.g.clients[0]
2043         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2044
2045         d = defer.succeed(None)
2046         d.addCallback(lambda ignored:
2047             self.shouldFail(UploadUnhappinessError,
2048                             "test_encoder_bucket_abort",
2049                             "",
2050                             self._do_upload_with_broken_servers, 1))
2051         def _turn_barrier(res):
2052             return fireEventually(res)
2053         d.addCallback(_turn_barrier)
2054         def _then(ignored):
2055             for server in self.g.servers_by_number.values():
2056                 self.failUnlessEqual(server.allocated_size(), 0)
2057         d.addCallback(_then)
2058         return d
2059
2060
2061     def _set_up_nodes_extra_config(self, clientdir):
2062         cfgfn = os.path.join(clientdir, "tahoe.cfg")
2063         oldcfg = open(cfgfn, "r").read()
2064         f = open(cfgfn, "wt")
2065         f.write(oldcfg)
2066         f.write("\n")
2067         f.write("[client]\n")
2068         f.write("shares.needed = 7\n")
2069         f.write("shares.total = 12\n")
2070         f.write("\n")
2071         f.close()
2072         return None
2073
2074 # TODO:
2075 #  upload with exactly 75 servers (shares_of_happiness)
2076 #  have a download fail
2077 #  cancel a download (need to implement more cancel stuff)
2078
2079 # from test_encode:
2080 # NoNetworkGrid, upload part of ciphertext, kill server, continue upload
2081 # check with Kevan, they want to live in test_upload, existing tests might cover
2082 #     def test_lost_one_shareholder(self): # these are upload-side tests
2083 #     def test_lost_one_shareholder_early(self):
2084 #     def test_lost_many_shareholders(self):
2085 #     def test_lost_all_shareholders(self):