]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
f1fefd4e6b5fae762fa3fce8938455f69dbc1d33
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1 # -*- coding: utf-8 -*-
2
3 import os, shutil
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.python.failure import Failure
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
9
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
14 from allmydata.util import log
15 from allmydata.util.assertutil import precondition
16 from allmydata.util.deferredutil import DeferredListShouldSucceed
17 from allmydata.test.no_network import GridTestMixin
18 from allmydata.test.common_util import ShouldFailMixin
19 from allmydata.util.happinessutil import servers_of_happiness, \
20                                          shares_by_server, merge_servers
21 from allmydata.storage_client import StorageFarmBroker
22 from allmydata.storage.server import storage_index_to_dir
23
24 MiB = 1024*1024
25
26 def extract_uri(results):
27     return results.uri
28
29 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
30 # have been due to an earlier test ERROR'ing out due to timeout, which seems
31 # to screw up subsequent tests.
32 timeout = 960
33
34 class Uploadable(unittest.TestCase):
35     def shouldEqual(self, data, expected):
36         self.failUnless(isinstance(data, list))
37         for e in data:
38             self.failUnless(isinstance(e, str))
39         s = "".join(data)
40         self.failUnlessEqual(s, expected)
41
42     def test_filehandle_random_key(self):
43         return self._test_filehandle(convergence=None)
44
45     def test_filehandle_convergent_encryption(self):
46         return self._test_filehandle(convergence="some convergence string")
47
48     def _test_filehandle(self, convergence):
49         s = StringIO("a"*41)
50         u = upload.FileHandle(s, convergence=convergence)
51         d = u.get_size()
52         d.addCallback(self.failUnlessEqual, 41)
53         d.addCallback(lambda res: u.read(1))
54         d.addCallback(self.shouldEqual, "a")
55         d.addCallback(lambda res: u.read(80))
56         d.addCallback(self.shouldEqual, "a"*40)
57         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
58         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
59         return d
60
61     def test_filename(self):
62         basedir = "upload/Uploadable/test_filename"
63         os.makedirs(basedir)
64         fn = os.path.join(basedir, "file")
65         f = open(fn, "w")
66         f.write("a"*41)
67         f.close()
68         u = upload.FileName(fn, convergence=None)
69         d = u.get_size()
70         d.addCallback(self.failUnlessEqual, 41)
71         d.addCallback(lambda res: u.read(1))
72         d.addCallback(self.shouldEqual, "a")
73         d.addCallback(lambda res: u.read(80))
74         d.addCallback(self.shouldEqual, "a"*40)
75         d.addCallback(lambda res: u.close())
76         return d
77
78     def test_data(self):
79         s = "a"*41
80         u = upload.Data(s, convergence=None)
81         d = u.get_size()
82         d.addCallback(self.failUnlessEqual, 41)
83         d.addCallback(lambda res: u.read(1))
84         d.addCallback(self.shouldEqual, "a")
85         d.addCallback(lambda res: u.read(80))
86         d.addCallback(self.shouldEqual, "a"*40)
87         d.addCallback(lambda res: u.close())
88         return d
89
90 class ServerError(Exception):
91     pass
92
93 class SetDEPMixin:
94     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
95         p = {"k": k,
96              "happy": happy,
97              "n": n,
98              "max_segment_size": max_segsize,
99              }
100         self.node.DEFAULT_ENCODING_PARAMETERS = p
101
102 class FakeStorageServer:
103     def __init__(self, mode):
104         self.mode = mode
105         self.allocated = []
106         self.queries = 0
107         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
108                          { "maximum-immutable-share-size": 2**32 },
109                          "application-version": str(allmydata.__full_version__),
110                          }
111         if mode == "small":
112             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
113                              { "maximum-immutable-share-size": 10 },
114                              "application-version": str(allmydata.__full_version__),
115                              }
116
117
118     def callRemote(self, methname, *args, **kwargs):
119         def _call():
120             meth = getattr(self, methname)
121             return meth(*args, **kwargs)
122         d = fireEventually()
123         d.addCallback(lambda res: _call())
124         return d
125
126     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
127                          sharenums, share_size, canary):
128         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
129         if self.mode == "first-fail":
130             if self.queries == 0:
131                 raise ServerError
132         if self.mode == "second-fail":
133             if self.queries == 1:
134                 raise ServerError
135         self.queries += 1
136         if self.mode == "full":
137             return (set(), {},)
138         elif self.mode == "already got them":
139             return (set(sharenums), {},)
140         else:
141             for shnum in sharenums:
142                 self.allocated.append( (storage_index, shnum) )
143             return (set(),
144                     dict([( shnum, FakeBucketWriter(share_size) )
145                           for shnum in sharenums]),
146                     )
147
148 class FakeBucketWriter:
149     # a diagnostic version of storageserver.BucketWriter
150     def __init__(self, size):
151         self.data = StringIO()
152         self.closed = False
153         self._size = size
154
155     def callRemote(self, methname, *args, **kwargs):
156         def _call():
157             meth = getattr(self, "remote_" + methname)
158             return meth(*args, **kwargs)
159         d = fireEventually()
160         d.addCallback(lambda res: _call())
161         return d
162
163
164     def callRemoteOnly(self, methname, *args, **kwargs):
165         d = self.callRemote(methname, *args, **kwargs)
166         del d # callRemoteOnly ignores this
167         return None
168
169
170     def remote_write(self, offset, data):
171         precondition(not self.closed)
172         precondition(offset >= 0)
173         precondition(offset+len(data) <= self._size,
174                      "offset=%d + data=%d > size=%d" %
175                      (offset, len(data), self._size))
176         self.data.seek(offset)
177         self.data.write(data)
178
179     def remote_close(self):
180         precondition(not self.closed)
181         self.closed = True
182
183     def remote_abort(self):
184         pass
185
186 class FakeClient:
187     DEFAULT_ENCODING_PARAMETERS = {"k":25,
188                                    "happy": 25,
189                                    "n": 100,
190                                    "max_segment_size": 1*MiB,
191                                    }
192     def __init__(self, mode="good", num_servers=50):
193         self.num_servers = num_servers
194         if type(mode) is str:
195             mode = dict([i,mode] for i in range(num_servers))
196         servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
197                     for fakeid in range(self.num_servers) ]
198         self.storage_broker = StorageFarmBroker(None, permute_peers=True)
199         for (serverid, rref) in servers:
200             self.storage_broker.test_add_rref(serverid, rref)
201         self.last_servers = [s[1] for s in servers]
202
203     def log(self, *args, **kwargs):
204         pass
205     def get_encoding_parameters(self):
206         return self.DEFAULT_ENCODING_PARAMETERS
207     def get_storage_broker(self):
208         return self.storage_broker
209     _secret_holder = client.SecretHolder("lease secret", "convergence secret")
210
211 class GotTooFarError(Exception):
212     pass
213
214 class GiganticUploadable(upload.FileHandle):
215     def __init__(self, size):
216         self._size = size
217         self._fp = 0
218
219     def get_encryption_key(self):
220         return defer.succeed("\x00" * 16)
221     def get_size(self):
222         return defer.succeed(self._size)
223     def read(self, length):
224         left = self._size - self._fp
225         length = min(left, length)
226         self._fp += length
227         if self._fp > 1000000:
228             # terminate the test early.
229             raise GotTooFarError("we shouldn't be allowed to get this far")
230         return defer.succeed(["\x00" * length])
231     def close(self):
232         pass
233
234 DATA = """
235 Once upon a time, there was a beautiful princess named Buttercup. She lived
236 in a magical land where every file was stored securely among millions of
237 machines, and nobody ever worried about their data being lost ever again.
238 The End.
239 """
240 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
241
242 SIZE_ZERO = 0
243 SIZE_SMALL = 16
244 SIZE_LARGE = len(DATA)
245
246 def upload_data(uploader, data):
247     u = upload.Data(data, convergence=None)
248     return uploader.upload(u)
249 def upload_filename(uploader, filename):
250     u = upload.FileName(filename, convergence=None)
251     return uploader.upload(u)
252 def upload_filehandle(uploader, fh):
253     u = upload.FileHandle(fh, convergence=None)
254     return uploader.upload(u)
255
256 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
257     def setUp(self):
258         self.node = FakeClient(mode="good")
259         self.u = upload.Uploader()
260         self.u.running = True
261         self.u.parent = self.node
262
263     def _check_small(self, newuri, size):
264         u = uri.from_string(newuri)
265         self.failUnless(isinstance(u, uri.LiteralFileURI))
266         self.failUnlessEqual(len(u.data), size)
267
268     def _check_large(self, newuri, size):
269         u = uri.from_string(newuri)
270         self.failUnless(isinstance(u, uri.CHKFileURI))
271         self.failUnless(isinstance(u.get_storage_index(), str))
272         self.failUnlessEqual(len(u.get_storage_index()), 16)
273         self.failUnless(isinstance(u.key, str))
274         self.failUnlessEqual(len(u.key), 16)
275         self.failUnlessEqual(u.size, size)
276
277     def get_data(self, size):
278         return DATA[:size]
279
280     def test_too_large(self):
281         # we've removed the 4GiB share size limit (see ticket #346 for
282         # details), but still have an 8-byte field, so the limit is now
283         # 2**64, so make sure we reject files larger than that.
284         k = 3; happy = 7; n = 10
285         self.set_encoding_parameters(k, happy, n)
286         big = k*(2**64)
287         data1 = GiganticUploadable(big)
288         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
289                             "This file is too large to be uploaded (data_size)",
290                             self.u.upload, data1)
291         data2 = GiganticUploadable(big-3)
292         d.addCallback(lambda res:
293                       self.shouldFail(FileTooLargeError,
294                                       "test_too_large-data2",
295                                       "This file is too large to be uploaded (offsets)",
296                                       self.u.upload, data2))
297         # I don't know where the actual limit is.. it depends upon how large
298         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
299         return d
300
301     def test_data_zero(self):
302         data = self.get_data(SIZE_ZERO)
303         d = upload_data(self.u, data)
304         d.addCallback(extract_uri)
305         d.addCallback(self._check_small, SIZE_ZERO)
306         return d
307
308     def test_data_small(self):
309         data = self.get_data(SIZE_SMALL)
310         d = upload_data(self.u, data)
311         d.addCallback(extract_uri)
312         d.addCallback(self._check_small, SIZE_SMALL)
313         return d
314
315     def test_data_large(self):
316         data = self.get_data(SIZE_LARGE)
317         d = upload_data(self.u, data)
318         d.addCallback(extract_uri)
319         d.addCallback(self._check_large, SIZE_LARGE)
320         return d
321
322     def test_data_large_odd_segments(self):
323         data = self.get_data(SIZE_LARGE)
324         segsize = int(SIZE_LARGE / 2.5)
325         # we want 3 segments, since that's not a power of two
326         self.set_encoding_parameters(25, 25, 100, segsize)
327         d = upload_data(self.u, data)
328         d.addCallback(extract_uri)
329         d.addCallback(self._check_large, SIZE_LARGE)
330         return d
331
332     def test_filehandle_zero(self):
333         data = self.get_data(SIZE_ZERO)
334         d = upload_filehandle(self.u, StringIO(data))
335         d.addCallback(extract_uri)
336         d.addCallback(self._check_small, SIZE_ZERO)
337         return d
338
339     def test_filehandle_small(self):
340         data = self.get_data(SIZE_SMALL)
341         d = upload_filehandle(self.u, StringIO(data))
342         d.addCallback(extract_uri)
343         d.addCallback(self._check_small, SIZE_SMALL)
344         return d
345
346     def test_filehandle_large(self):
347         data = self.get_data(SIZE_LARGE)
348         d = upload_filehandle(self.u, StringIO(data))
349         d.addCallback(extract_uri)
350         d.addCallback(self._check_large, SIZE_LARGE)
351         return d
352
353     def test_filename_zero(self):
354         fn = "Uploader-test_filename_zero.data"
355         f = open(fn, "wb")
356         data = self.get_data(SIZE_ZERO)
357         f.write(data)
358         f.close()
359         d = upload_filename(self.u, fn)
360         d.addCallback(extract_uri)
361         d.addCallback(self._check_small, SIZE_ZERO)
362         return d
363
364     def test_filename_small(self):
365         fn = "Uploader-test_filename_small.data"
366         f = open(fn, "wb")
367         data = self.get_data(SIZE_SMALL)
368         f.write(data)
369         f.close()
370         d = upload_filename(self.u, fn)
371         d.addCallback(extract_uri)
372         d.addCallback(self._check_small, SIZE_SMALL)
373         return d
374
375     def test_filename_large(self):
376         fn = "Uploader-test_filename_large.data"
377         f = open(fn, "wb")
378         data = self.get_data(SIZE_LARGE)
379         f.write(data)
380         f.close()
381         d = upload_filename(self.u, fn)
382         d.addCallback(extract_uri)
383         d.addCallback(self._check_large, SIZE_LARGE)
384         return d
385
386 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
387     def make_node(self, mode, num_servers=10):
388         self.node = FakeClient(mode, num_servers)
389         self.u = upload.Uploader()
390         self.u.running = True
391         self.u.parent = self.node
392
393     def _check_large(self, newuri, size):
394         u = uri.from_string(newuri)
395         self.failUnless(isinstance(u, uri.CHKFileURI))
396         self.failUnless(isinstance(u.get_storage_index(), str))
397         self.failUnlessEqual(len(u.get_storage_index()), 16)
398         self.failUnless(isinstance(u.key, str))
399         self.failUnlessEqual(len(u.key), 16)
400         self.failUnlessEqual(u.size, size)
401
402     def test_first_error(self):
403         mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
404         self.make_node(mode)
405         self.set_encoding_parameters(k=25, happy=1, n=50)
406         d = upload_data(self.u, DATA)
407         d.addCallback(extract_uri)
408         d.addCallback(self._check_large, SIZE_LARGE)
409         return d
410
411     def test_first_error_all(self):
412         self.make_node("first-fail")
413         d = self.shouldFail(UploadUnhappinessError, "first_error_all",
414                             "server selection failed",
415                             upload_data, self.u, DATA)
416         def _check((f,)):
417             self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
418             # there should also be a 'last failure was' message
419             self.failUnlessIn("ServerError", str(f.value))
420         d.addCallback(_check)
421         return d
422
423     def test_second_error(self):
424         # we want to make sure we make it to a third pass. This means that
425         # the first pass was insufficient to place all shares, and at least
426         # one of second pass servers (other than the last one) accepted a
427         # share (so we'll believe that a third pass will be useful). (if
428         # everyone but the last server throws an error, then we'll send all
429         # the remaining shares to the last server at the end of the second
430         # pass, and if that succeeds, we won't make it to a third pass).
431         #
432         # we can achieve this 97.5% of the time by using 40 servers, having
433         # 39 of them fail on the second request, leaving only one to succeed
434         # on the second request. (we need to keep the number of servers low
435         # enough to ensure a second pass with 100 shares).
436         mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
437         self.make_node(mode, 40)
438         d = upload_data(self.u, DATA)
439         d.addCallback(extract_uri)
440         d.addCallback(self._check_large, SIZE_LARGE)
441         return d
442
443     def test_second_error_all(self):
444         self.make_node("second-fail")
445         d = self.shouldFail(UploadUnhappinessError, "second_error_all",
446                             "server selection failed",
447                             upload_data, self.u, DATA)
448         def _check((f,)):
449             self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
450             # there should also be a 'last failure was' message
451             self.failUnlessIn("ServerError", str(f.value))
452         d.addCallback(_check)
453         return d
454
455 class FullServer(unittest.TestCase):
456     def setUp(self):
457         self.node = FakeClient(mode="full")
458         self.u = upload.Uploader()
459         self.u.running = True
460         self.u.parent = self.node
461
462     def _should_fail(self, f):
463         self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
464
465     def test_data_large(self):
466         data = DATA
467         d = upload_data(self.u, data)
468         d.addBoth(self._should_fail)
469         return d
470
471 class ServerSelection(unittest.TestCase):
472
473     def make_client(self, num_servers=50):
474         self.node = FakeClient(mode="good", num_servers=num_servers)
475         self.u = upload.Uploader()
476         self.u.running = True
477         self.u.parent = self.node
478
479     def get_data(self, size):
480         return DATA[:size]
481
482     def _check_large(self, newuri, size):
483         u = uri.from_string(newuri)
484         self.failUnless(isinstance(u, uri.CHKFileURI))
485         self.failUnless(isinstance(u.get_storage_index(), str))
486         self.failUnlessEqual(len(u.get_storage_index()), 16)
487         self.failUnless(isinstance(u.key, str))
488         self.failUnlessEqual(len(u.key), 16)
489         self.failUnlessEqual(u.size, size)
490
491     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
492         p = {"k": k,
493              "happy": happy,
494              "n": n,
495              "max_segment_size": max_segsize,
496              }
497         self.node.DEFAULT_ENCODING_PARAMETERS = p
498
499     def test_one_each(self):
500         # if we have 50 shares, and there are 50 servers, and they all accept
501         # a share, we should get exactly one share per server
502
503         self.make_client()
504         data = self.get_data(SIZE_LARGE)
505         self.set_encoding_parameters(25, 30, 50)
506         d = upload_data(self.u, data)
507         d.addCallback(extract_uri)
508         d.addCallback(self._check_large, SIZE_LARGE)
509         def _check(res):
510             for s in self.node.last_servers:
511                 allocated = s.allocated
512                 self.failUnlessEqual(len(allocated), 1)
513                 self.failUnlessEqual(s.queries, 1)
514         d.addCallback(_check)
515         return d
516
517     def test_two_each(self):
518         # if we have 100 shares, and there are 50 servers, and they all
519         # accept all shares, we should get exactly two shares per server
520
521         self.make_client()
522         data = self.get_data(SIZE_LARGE)
523         # if there are 50 servers, then happy needs to be <= 50
524         self.set_encoding_parameters(50, 50, 100)
525         d = upload_data(self.u, data)
526         d.addCallback(extract_uri)
527         d.addCallback(self._check_large, SIZE_LARGE)
528         def _check(res):
529             for s in self.node.last_servers:
530                 allocated = s.allocated
531                 self.failUnlessEqual(len(allocated), 2)
532                 self.failUnlessEqual(s.queries, 2)
533         d.addCallback(_check)
534         return d
535
536     def test_one_each_plus_one_extra(self):
537         # if we have 51 shares, and there are 50 servers, then one server
538         # gets two shares and the rest get just one
539
540         self.make_client()
541         data = self.get_data(SIZE_LARGE)
542         self.set_encoding_parameters(24, 41, 51)
543         d = upload_data(self.u, data)
544         d.addCallback(extract_uri)
545         d.addCallback(self._check_large, SIZE_LARGE)
546         def _check(res):
547             got_one = []
548             got_two = []
549             for s in self.node.last_servers:
550                 allocated = s.allocated
551                 self.failUnless(len(allocated) in (1,2), len(allocated))
552                 if len(allocated) == 1:
553                     self.failUnlessEqual(s.queries, 1)
554                     got_one.append(s)
555                 else:
556                     self.failUnlessEqual(s.queries, 2)
557                     got_two.append(s)
558             self.failUnlessEqual(len(got_one), 49)
559             self.failUnlessEqual(len(got_two), 1)
560         d.addCallback(_check)
561         return d
562
563     def test_four_each(self):
564         # if we have 200 shares, and there are 50 servers, then each server
565         # gets 4 shares. The design goal is to accomplish this with only two
566         # queries per server.
567
568         self.make_client()
569         data = self.get_data(SIZE_LARGE)
570         # if there are 50 servers, then happy should be no more than 50 if we
571         # want this to work.
572         self.set_encoding_parameters(100, 50, 200)
573         d = upload_data(self.u, data)
574         d.addCallback(extract_uri)
575         d.addCallback(self._check_large, SIZE_LARGE)
576         def _check(res):
577             for s in self.node.last_servers:
578                 allocated = s.allocated
579                 self.failUnlessEqual(len(allocated), 4)
580                 self.failUnlessEqual(s.queries, 2)
581         d.addCallback(_check)
582         return d
583
584     def test_three_of_ten(self):
585         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
586         # 4+4+2
587
588         self.make_client(3)
589         data = self.get_data(SIZE_LARGE)
590         self.set_encoding_parameters(3, 3, 10)
591         d = upload_data(self.u, data)
592         d.addCallback(extract_uri)
593         d.addCallback(self._check_large, SIZE_LARGE)
594         def _check(res):
595             counts = {}
596             for s in self.node.last_servers:
597                 allocated = s.allocated
598                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
599             histogram = [counts.get(i, 0) for i in range(5)]
600             self.failUnlessEqual(histogram, [0,0,0,2,1])
601         d.addCallback(_check)
602         return d
603
604     def test_some_big_some_small(self):
605         # 10 shares, 20 servers, but half the servers don't support a
606         # share-size large enough for our file
607         mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
608         self.node = FakeClient(mode, num_servers=20)
609         self.u = upload.Uploader()
610         self.u.running = True
611         self.u.parent = self.node
612
613         data = self.get_data(SIZE_LARGE)
614         self.set_encoding_parameters(3, 5, 10)
615         d = upload_data(self.u, data)
616         d.addCallback(extract_uri)
617         d.addCallback(self._check_large, SIZE_LARGE)
618         def _check(res):
619             # we should have put one share each on the big servers, and zero
620             # shares on the small servers
621             total_allocated = 0
622             for p in self.node.last_servers:
623                 if p.mode == "good":
624                     self.failUnlessEqual(len(p.allocated), 1)
625                 elif p.mode == "small":
626                     self.failUnlessEqual(len(p.allocated), 0)
627                 total_allocated += len(p.allocated)
628             self.failUnlessEqual(total_allocated, 10)
629         d.addCallback(_check)
630         return d
631
632
633 class StorageIndex(unittest.TestCase):
634     def test_params_must_matter(self):
635         DATA = "I am some data"
636         u = upload.Data(DATA, convergence="")
637         eu = upload.EncryptAnUploadable(u)
638         d1 = eu.get_storage_index()
639
640         # CHK means the same data should encrypt the same way
641         u = upload.Data(DATA, convergence="")
642         eu = upload.EncryptAnUploadable(u)
643         d1a = eu.get_storage_index()
644
645         # but if we use a different convergence string it should be different
646         u = upload.Data(DATA, convergence="wheee!")
647         eu = upload.EncryptAnUploadable(u)
648         d1salt1 = eu.get_storage_index()
649
650         # and if we add yet a different convergence it should be different again
651         u = upload.Data(DATA, convergence="NOT wheee!")
652         eu = upload.EncryptAnUploadable(u)
653         d1salt2 = eu.get_storage_index()
654
655         # and if we use the first string again it should be the same as last time
656         u = upload.Data(DATA, convergence="wheee!")
657         eu = upload.EncryptAnUploadable(u)
658         d1salt1a = eu.get_storage_index()
659
660         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
661         u = upload.Data(DATA, convergence="")
662         u.encoding_param_k = u.default_encoding_param_k + 1
663         eu = upload.EncryptAnUploadable(u)
664         d2 = eu.get_storage_index()
665
666         # and if we use a random key, it should be different than the CHK
667         u = upload.Data(DATA, convergence=None)
668         eu = upload.EncryptAnUploadable(u)
669         d3 = eu.get_storage_index()
670         # and different from another instance
671         u = upload.Data(DATA, convergence=None)
672         eu = upload.EncryptAnUploadable(u)
673         d4 = eu.get_storage_index()
674
675         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
676         def _done(res):
677             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
678             self.failUnlessEqual(si1, si1a)
679             self.failIfEqual(si1, si2)
680             self.failIfEqual(si1, si3)
681             self.failIfEqual(si1, si4)
682             self.failIfEqual(si3, si4)
683             self.failIfEqual(si1salt1, si1)
684             self.failIfEqual(si1salt1, si1salt2)
685             self.failIfEqual(si1salt2, si1)
686             self.failUnlessEqual(si1salt1, si1salt1a)
687         d.addCallback(_done)
688         return d
689
690 # copied from python docs because itertools.combinations was added in
691 # python 2.6 and we support >= 2.4.
692 def combinations(iterable, r):
693     # combinations('ABCD', 2) --> AB AC AD BC BD CD
694     # combinations(range(4), 3) --> 012 013 023 123
695     pool = tuple(iterable)
696     n = len(pool)
697     if r > n:
698         return
699     indices = range(r)
700     yield tuple(pool[i] for i in indices)
701     while True:
702         for i in reversed(range(r)):
703             if indices[i] != i + n - r:
704                 break
705         else:
706             return
707         indices[i] += 1
708         for j in range(i+1, r):
709             indices[j] = indices[j-1] + 1
710         yield tuple(pool[i] for i in indices)
711
712 def is_happy_enough(servertoshnums, h, k):
713     """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """
714     if len(servertoshnums) < h:
715         return False
716     # print "servertoshnums: ", servertoshnums, h, k
717     for happysetcombo in combinations(servertoshnums.iterkeys(), h):
718         # print "happysetcombo: ", happysetcombo
719         for subsetcombo in combinations(happysetcombo, k):
720             shnums = reduce(set.union, [ servertoshnums[s] for s in subsetcombo ])
721             # print "subsetcombo: ", subsetcombo, ", shnums: ", shnums
722             if len(shnums) < k:
723                 # print "NOT HAAPP{Y", shnums, k
724                 return False
725     # print "HAAPP{Y"
726     return True
727
728 class FakeServerTracker:
729     def __init__(self, serverid, buckets):
730         self._serverid = serverid
731         self.buckets = buckets
732     def get_serverid(self):
733         return self._serverid
734
735 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
736     ShouldFailMixin):
737     def find_all_shares(self, unused=None):
738         """Locate shares on disk. Returns a dict that maps
739         server to set of sharenums.
740         """
741         assert self.g, "I tried to find a grid at self.g, but failed"
742         servertoshnums = {} # k: server, v: set(shnum)
743
744         for i, c in self.g.servers_by_number.iteritems():
745             for (dirp, dirns, fns) in os.walk(c.sharedir):
746                 for fn in fns:
747                     try:
748                         sharenum = int(fn)
749                     except TypeError:
750                         # Whoops, I guess that's not a share file then.
751                         pass
752                     else:
753                         servertoshnums.setdefault(i, set()).add(sharenum)
754
755         return servertoshnums
756
757     def _do_upload_with_broken_servers(self, servers_to_break):
758         """
759         I act like a normal upload, but before I send the results of
760         Tahoe2ServerSelector to the Encoder, I break the first
761         servers_to_break ServerTrackers in the upload_servers part of the
762         return result.
763         """
764         assert self.g, "I tried to find a grid at self.g, but failed"
765         broker = self.g.clients[0].storage_broker
766         sh     = self.g.clients[0]._secret_holder
767         data = upload.Data("data" * 10000, convergence="")
768         data.encoding_param_k = 3
769         data.encoding_param_happy = 4
770         data.encoding_param_n = 10
771         uploadable = upload.EncryptAnUploadable(data)
772         encoder = encode.Encoder()
773         encoder.set_encrypted_uploadable(uploadable)
774         status = upload.UploadStatus()
775         selector = upload.Tahoe2ServerSelector("dglev", "test", status)
776         storage_index = encoder.get_param("storage_index")
777         share_size = encoder.get_param("share_size")
778         block_size = encoder.get_param("block_size")
779         num_segments = encoder.get_param("num_segments")
780         d = selector.get_shareholders(broker, sh, storage_index,
781                                       share_size, block_size, num_segments,
782                                       10, 3, 4)
783         def _have_shareholders((upload_trackers, already_servers)):
784             assert servers_to_break <= len(upload_trackers)
785             for index in xrange(servers_to_break):
786                 tracker = list(upload_trackers)[index]
787                 for share in tracker.buckets.keys():
788                     tracker.buckets[share].abort()
789             buckets = {}
790             servermap = already_servers.copy()
791             for tracker in upload_trackers:
792                 buckets.update(tracker.buckets)
793                 for bucket in tracker.buckets:
794                     servermap.setdefault(bucket, set()).add(tracker.get_serverid())
795             encoder.set_shareholders(buckets, servermap)
796             d = encoder.start()
797             return d
798         d.addCallback(_have_shareholders)
799         return d
800
801     def _has_happy_share_distribution(self):
802         servertoshnums = self.find_all_shares()
803         k = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['k']
804         h = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['happy']
805         return is_happy_enough(servertoshnums, h, k)
806
807     def _add_server(self, server_number, readonly=False):
808         assert self.g, "I tried to find a grid at self.g, but failed"
809         ss = self.g.make_server(server_number, readonly)
810         log.msg("just created a server, number: %s => %s" % (server_number, ss,))
811         self.g.add_server(server_number, ss)
812
813     def _add_server_with_share(self, server_number, share_number=None,
814                                readonly=False):
815         self._add_server(server_number, readonly)
816         if share_number is not None:
817             self._copy_share_to_server(share_number, server_number)
818
819
820     def _copy_share_to_server(self, share_number, server_number):
821         ss = self.g.servers_by_number[server_number]
822         # Copy share i from the directory associated with the first
823         # storage server to the directory associated with this one.
824         assert self.g, "I tried to find a grid at self.g, but failed"
825         assert self.shares, "I tried to find shares at self.shares, but failed"
826         old_share_location = self.shares[share_number][2]
827         new_share_location = os.path.join(ss.storedir, "shares")
828         si = uri.from_string(self.uri).get_storage_index()
829         new_share_location = os.path.join(new_share_location,
830                                           storage_index_to_dir(si))
831         if not os.path.exists(new_share_location):
832             os.makedirs(new_share_location)
833         new_share_location = os.path.join(new_share_location,
834                                           str(share_number))
835         if old_share_location != new_share_location:
836             shutil.copy(old_share_location, new_share_location)
837         shares = self.find_uri_shares(self.uri)
838         # Make sure that the storage server has the share.
839         self.failUnless((share_number, ss.my_nodeid, new_share_location)
840                         in shares)
841
842     def _setup_grid(self):
843         """
844         I set up a NoNetworkGrid with a single server and client.
845         """
846         self.set_up_grid(num_clients=1, num_servers=1)
847
848     def _setup_and_upload(self, **kwargs):
849         """
850         I set up a NoNetworkGrid with a single server and client,
851         upload a file to it, store its uri in self.uri, and store its
852         sharedata in self.shares.
853         """
854         self._setup_grid()
855         client = self.g.clients[0]
856         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
857         if "n" in kwargs and "k" in kwargs:
858             client.DEFAULT_ENCODING_PARAMETERS['k'] = kwargs['k']
859             client.DEFAULT_ENCODING_PARAMETERS['n'] = kwargs['n']
860         data = upload.Data("data" * 10000, convergence="")
861         self.data = data
862         d = client.upload(data)
863         def _store_uri(ur):
864             self.uri = ur.uri
865         d.addCallback(_store_uri)
866         d.addCallback(lambda ign:
867             self.find_uri_shares(self.uri))
868         def _store_shares(shares):
869             self.shares = shares
870         d.addCallback(_store_shares)
871         return d
872
873     def test_configure_parameters(self):
874         self.basedir = self.mktemp()
875         hooks = {0: self._set_up_nodes_extra_config}
876         self.set_up_grid(client_config_hooks=hooks)
877         c0 = self.g.clients[0]
878
879         DATA = "data" * 100
880         u = upload.Data(DATA, convergence="")
881         d = c0.upload(u)
882         d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
883         m = monitor.Monitor()
884         d.addCallback(lambda fn: fn.check(m))
885         def _check(cr):
886             data = cr.get_data()
887             self.failUnlessEqual(data["count-shares-needed"], 7)
888             self.failUnlessEqual(data["count-shares-expected"], 12)
889         d.addCallback(_check)
890         return d
891
892
893     def _setUp(self, ns):
894         # Used by test_happy_semantics and test_preexisting_share_behavior
895         # to set up the grid.
896         self.node = FakeClient(mode="good", num_servers=ns)
897         self.u = upload.Uploader()
898         self.u.running = True
899         self.u.parent = self.node
900
901
902     def test_happy_semantics(self):
903         self._setUp(2)
904         DATA = upload.Data("kittens" * 10000, convergence="")
905         # These parameters are unsatisfiable with only 2 servers.
906         self.set_encoding_parameters(k=3, happy=5, n=10)
907         d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
908                             "shares could be placed or found on only 2 "
909                             "server(s). We were asked to place shares on "
910                             "at least 5 server(s) such that any 3 of them "
911                             "have enough shares to recover the file",
912                             self.u.upload, DATA)
913         # Let's reset the client to have 10 servers
914         d.addCallback(lambda ign:
915             self._setUp(10))
916         # These parameters are satisfiable with 10 servers.
917         d.addCallback(lambda ign:
918             self.set_encoding_parameters(k=3, happy=5, n=10))
919         d.addCallback(lambda ign:
920             self.u.upload(DATA))
921         # Let's reset the client to have 7 servers
922         # (this is less than n, but more than h)
923         d.addCallback(lambda ign:
924             self._setUp(7))
925         # These parameters are satisfiable with 7 servers.
926         d.addCallback(lambda ign:
927             self.set_encoding_parameters(k=3, happy=5, n=10))
928         d.addCallback(lambda ign:
929             self.u.upload(DATA))
930         return d
931
932     def test_aborted_shares(self):
933         self.basedir = "upload/EncodingParameters/aborted_shares"
934         self.set_up_grid(num_servers=4)
935         c = self.g.clients[0]
936         DATA = upload.Data(100* "kittens", convergence="")
937         # These parameters are unsatisfiable with only 4 servers, but should
938         # work with 5, as long as the original 4 are not stuck in the open
939         # BucketWriter state (open() but not
940         parms = {"k":2, "happy":5, "n":5, "max_segment_size": 1*MiB}
941         c.DEFAULT_ENCODING_PARAMETERS = parms
942         d = self.shouldFail(UploadUnhappinessError, "test_aborted_shares",
943                             "shares could be placed on only 4 "
944                             "server(s) such that any 2 of them have enough "
945                             "shares to recover the file, but we were asked "
946                             "to place shares on at least 5 such servers",
947                             c.upload, DATA)
948         # now add the 5th server
949         d.addCallback(lambda ign: self._add_server(4, False))
950         # and this time the upload ought to succeed
951         d.addCallback(lambda ign: c.upload(DATA))
952         d.addCallback(lambda ign:
953             self.failUnless(self._has_happy_share_distribution()))
954         return d
955
956
957     def test_problem_layout_comment_52(self):
958         def _basedir():
959             self.basedir = self.mktemp()
960         _basedir()
961         # This scenario is at
962         # http://allmydata.org/trac/tahoe/ticket/778#comment:52
963         #
964         # The scenario in comment:52 proposes that we have a layout
965         # like:
966         # server 0: shares 1 - 9
967         # server 1: share 0, read-only
968         # server 2: share 0, read-only
969         # server 3: share 0, read-only
970         # To get access to the shares, we will first upload to one
971         # server, which will then have shares 0 - 9. We'll then
972         # add three new servers, configure them to not accept any new
973         # shares, then write share 0 directly into the serverdir of each,
974         # and then remove share 0 from server 0 in the same way.
975         # Then each of servers 1 - 3 will report that they have share 0,
976         # and will not accept any new share, while server 0 will report that
977         # it has shares 1 - 9 and will accept new shares.
978         # We'll then set 'happy' = 4, and see that an upload fails
979         # (as it should)
980         d = self._setup_and_upload()
981         d.addCallback(lambda ign:
982             self._add_server_with_share(server_number=1, share_number=0,
983                                         readonly=True))
984         d.addCallback(lambda ign:
985             self._add_server_with_share(server_number=2, share_number=0,
986                                         readonly=True))
987         d.addCallback(lambda ign:
988             self._add_server_with_share(server_number=3, share_number=0,
989                                         readonly=True))
990         # Remove the first share from server 0.
991         def _remove_share_0_from_server_0():
992             share_location = self.shares[0][2]
993             os.remove(share_location)
994         d.addCallback(lambda ign:
995             _remove_share_0_from_server_0())
996         # Set happy = 4 in the client.
997         def _prepare():
998             client = self.g.clients[0]
999             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1000             return client
1001         d.addCallback(lambda ign:
1002             _prepare())
1003         # Uploading data should fail
1004         d.addCallback(lambda client:
1005             self.shouldFail(UploadUnhappinessError,
1006                             "test_problem_layout_comment_52_test_1",
1007                             "shares could be placed or found on 4 server(s), "
1008                             "but they are not spread out evenly enough to "
1009                             "ensure that any 3 of these servers would have "
1010                             "enough shares to recover the file. "
1011                             "We were asked to place shares on at "
1012                             "least 4 servers such that any 3 of them have "
1013                             "enough shares to recover the file",
1014                             client.upload, upload.Data("data" * 10000,
1015                                                        convergence="")))
1016
1017         # Do comment:52, but like this:
1018         # server 2: empty
1019         # server 3: share 0, read-only
1020         # server 1: share 0, read-only
1021         # server 0: shares 0-9
1022         d.addCallback(lambda ign:
1023             _basedir())
1024         d.addCallback(lambda ign:
1025             self._setup_and_upload())
1026         d.addCallback(lambda ign:
1027             self._add_server(server_number=2))
1028         d.addCallback(lambda ign:
1029             self._add_server_with_share(server_number=3, share_number=0,
1030                                         readonly=True))
1031         d.addCallback(lambda ign:
1032             self._add_server_with_share(server_number=1, share_number=0,
1033                                         readonly=True))
1034         def _prepare2():
1035             client = self.g.clients[0]
1036             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1037             return client
1038         d.addCallback(lambda ign:
1039             _prepare2())
1040         d.addCallback(lambda client:
1041             self.shouldFail(UploadUnhappinessError,
1042                             "test_problem_layout_comment_52_test_2",
1043                             "shares could be placed on only 3 server(s) such "
1044                             "that any 3 of them have enough shares to recover "
1045                             "the file, but we were asked to place shares on "
1046                             "at least 4 such servers.",
1047                             client.upload, upload.Data("data" * 10000,
1048                                                        convergence="")))
1049         return d
1050
1051
1052     def test_problem_layout_comment_53(self):
1053         # This scenario is at
1054         # http://allmydata.org/trac/tahoe/ticket/778#comment:53
1055         #
1056         # Set up the grid to have one server
1057         def _change_basedir(ign):
1058             self.basedir = self.mktemp()
1059         _change_basedir(None)
1060         # We start by uploading all of the shares to one server.
1061         # Next, we'll add three new servers to our NoNetworkGrid. We'll add
1062         # one share from our initial upload to each of these.
1063         # The counterintuitive ordering of the share numbers is to deal with
1064         # the permuting of these servers -- distributing the shares this
1065         # way ensures that the Tahoe2ServerSelector sees them in the order
1066         # described below.
1067         d = self._setup_and_upload()
1068         d.addCallback(lambda ign:
1069             self._add_server_with_share(server_number=1, share_number=2))
1070         d.addCallback(lambda ign:
1071             self._add_server_with_share(server_number=2, share_number=0))
1072         d.addCallback(lambda ign:
1073             self._add_server_with_share(server_number=3, share_number=1))
1074         # So, we now have the following layout:
1075         # server 0: shares 0 - 9
1076         # server 1: share 2
1077         # server 2: share 0
1078         # server 3: share 1
1079         # We change the 'happy' parameter in the client to 4.
1080         # The Tahoe2ServerSelector will see the servers permuted as:
1081         # 2, 3, 1, 0
1082         # Ideally, a reupload of our original data should work.
1083         def _reset_encoding_parameters(ign, happy=4):
1084             client = self.g.clients[0]
1085             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1086             return client
1087         d.addCallback(_reset_encoding_parameters)
1088         d.addCallback(lambda client:
1089             client.upload(upload.Data("data" * 10000, convergence="")))
1090         d.addCallback(lambda ign:
1091             self.failUnless(self._has_happy_share_distribution()))
1092
1093
1094         # This scenario is basically comment:53, but changed so that the
1095         # Tahoe2ServerSelector sees the server with all of the shares before
1096         # any of the other servers.
1097         # The layout is:
1098         # server 2: shares 0 - 9
1099         # server 3: share 0
1100         # server 1: share 1
1101         # server 4: share 2
1102         # The Tahoe2ServerSelector sees the servers permuted as:
1103         # 2, 3, 1, 4
1104         # Note that server 0 has been replaced by server 4; this makes it
1105         # easier to ensure that the last server seen by Tahoe2ServerSelector
1106         # has only one share.
1107         d.addCallback(_change_basedir)
1108         d.addCallback(lambda ign:
1109             self._setup_and_upload())
1110         d.addCallback(lambda ign:
1111             self._add_server_with_share(server_number=2, share_number=0))
1112         d.addCallback(lambda ign:
1113             self._add_server_with_share(server_number=3, share_number=1))
1114         d.addCallback(lambda ign:
1115             self._add_server_with_share(server_number=1, share_number=2))
1116         # Copy all of the other shares to server number 2
1117         def _copy_shares(ign):
1118             for i in xrange(0, 10):
1119                 self._copy_share_to_server(i, 2)
1120         d.addCallback(_copy_shares)
1121         # Remove the first server, and add a placeholder with share 0
1122         d.addCallback(lambda ign:
1123             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1124         d.addCallback(lambda ign:
1125             self._add_server_with_share(server_number=4, share_number=0))
1126         # Now try uploading.
1127         d.addCallback(_reset_encoding_parameters)
1128         d.addCallback(lambda client:
1129             client.upload(upload.Data("data" * 10000, convergence="")))
1130         d.addCallback(lambda ign:
1131             self.failUnless(self._has_happy_share_distribution()))
1132
1133
1134         # Try the same thing, but with empty servers after the first one
1135         # We want to make sure that Tahoe2ServerSelector will redistribute
1136         # shares as necessary, not simply discover an existing layout.
1137         # The layout is:
1138         # server 2: shares 0 - 9
1139         # server 3: empty
1140         # server 1: empty
1141         # server 4: empty
1142         d.addCallback(_change_basedir)
1143         d.addCallback(lambda ign:
1144             self._setup_and_upload())
1145         d.addCallback(lambda ign:
1146             self._add_server(server_number=2))
1147         d.addCallback(lambda ign:
1148             self._add_server(server_number=3))
1149         d.addCallback(lambda ign:
1150             self._add_server(server_number=1))
1151         d.addCallback(lambda ign:
1152             self._add_server(server_number=4))
1153         d.addCallback(_copy_shares)
1154         d.addCallback(lambda ign:
1155             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1156         d.addCallback(_reset_encoding_parameters)
1157         d.addCallback(lambda client:
1158             client.upload(upload.Data("data" * 10000, convergence="")))
1159         # Make sure that only as many shares as necessary to satisfy
1160         # servers of happiness were pushed.
1161         d.addCallback(lambda results:
1162             self.failUnlessEqual(results.pushed_shares, 3))
1163         d.addCallback(lambda ign:
1164             self.failUnless(self._has_happy_share_distribution()))
1165         return d
1166
1167     def test_problem_layout_ticket_1124(self):
1168         self.basedir = self.mktemp()
1169         d = self._setup_and_upload(k=2, n=4)
1170
1171         # server 0: shares 0, 1, 2, 3
1172         # server 1: shares 0, 3
1173         # server 2: share 1
1174         # server 3: share 2
1175         # With this layout, an upload should just be satisfied that the current distribution is good enough, right?
1176         def _setup(ign):
1177             self._add_server_with_share(server_number=0, share_number=None)
1178             self._add_server_with_share(server_number=1, share_number=0)
1179             self._add_server_with_share(server_number=2, share_number=1)
1180             self._add_server_with_share(server_number=3, share_number=2)
1181             # Copy shares
1182             self._copy_share_to_server(3, 1)
1183             client = self.g.clients[0]
1184             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1185             return client
1186
1187         d.addCallback(_setup)
1188         d.addCallback(lambda client:
1189             client.upload(upload.Data("data" * 10000, convergence="")))
1190         d.addCallback(lambda ign:
1191             self.failUnless(self._has_happy_share_distribution()))
1192         return d
1193     test_problem_layout_ticket_1124.todo = "Fix this after 1.7.1 release."
1194
1195     def test_happiness_with_some_readonly_servers(self):
1196         # Try the following layout
1197         # server 2: shares 0-9
1198         # server 4: share 0, read-only
1199         # server 3: share 1, read-only
1200         # server 1: share 2, read-only
1201         self.basedir = self.mktemp()
1202         d = self._setup_and_upload()
1203         d.addCallback(lambda ign:
1204             self._add_server_with_share(server_number=2, share_number=0))
1205         d.addCallback(lambda ign:
1206             self._add_server_with_share(server_number=3, share_number=1,
1207                                         readonly=True))
1208         d.addCallback(lambda ign:
1209             self._add_server_with_share(server_number=1, share_number=2,
1210                                         readonly=True))
1211         # Copy all of the other shares to server number 2
1212         def _copy_shares(ign):
1213             for i in xrange(1, 10):
1214                 self._copy_share_to_server(i, 2)
1215         d.addCallback(_copy_shares)
1216         # Remove server 0, and add another in its place
1217         d.addCallback(lambda ign:
1218             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1219         d.addCallback(lambda ign:
1220             self._add_server_with_share(server_number=4, share_number=0,
1221                                         readonly=True))
1222         def _reset_encoding_parameters(ign, happy=4):
1223             client = self.g.clients[0]
1224             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1225             return client
1226         d.addCallback(_reset_encoding_parameters)
1227         d.addCallback(lambda client:
1228             client.upload(upload.Data("data" * 10000, convergence="")))
1229         d.addCallback(lambda ign:
1230             self.failUnless(self._has_happy_share_distribution()))
1231         return d
1232
1233
1234     def test_happiness_with_all_readonly_servers(self):
1235         # server 3: share 1, read-only
1236         # server 1: share 2, read-only
1237         # server 2: shares 0-9, read-only
1238         # server 4: share 0, read-only
1239         # The idea with this test is to make sure that the survey of
1240         # read-only servers doesn't undercount servers of happiness
1241         self.basedir = self.mktemp()
1242         d = self._setup_and_upload()
1243         d.addCallback(lambda ign:
1244             self._add_server_with_share(server_number=4, share_number=0,
1245                                         readonly=True))
1246         d.addCallback(lambda ign:
1247             self._add_server_with_share(server_number=3, share_number=1,
1248                                         readonly=True))
1249         d.addCallback(lambda ign:
1250             self._add_server_with_share(server_number=1, share_number=2,
1251                                         readonly=True))
1252         d.addCallback(lambda ign:
1253             self._add_server_with_share(server_number=2, share_number=0,
1254                                         readonly=True))
1255         def _copy_shares(ign):
1256             for i in xrange(1, 10):
1257                 self._copy_share_to_server(i, 2)
1258         d.addCallback(_copy_shares)
1259         d.addCallback(lambda ign:
1260             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1261         def _reset_encoding_parameters(ign, happy=4):
1262             client = self.g.clients[0]
1263             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1264             return client
1265         d.addCallback(_reset_encoding_parameters)
1266         d.addCallback(lambda client:
1267             client.upload(upload.Data("data" * 10000, convergence="")))
1268         d.addCallback(lambda ign:
1269             self.failUnless(self._has_happy_share_distribution()))
1270         return d
1271
1272
1273     def test_dropped_servers_in_encoder(self):
1274         # The Encoder does its own "servers_of_happiness" check if it
1275         # happens to lose a bucket during an upload (it assumes that
1276         # the layout presented to it satisfies "servers_of_happiness"
1277         # until a failure occurs)
1278         #
1279         # This test simulates an upload where servers break after server
1280         # selection, but before they are written to.
1281         def _set_basedir(ign=None):
1282             self.basedir = self.mktemp()
1283         _set_basedir()
1284         d = self._setup_and_upload();
1285         # Add 5 servers
1286         def _do_server_setup(ign):
1287             self._add_server(server_number=1)
1288             self._add_server(server_number=2)
1289             self._add_server(server_number=3)
1290             self._add_server(server_number=4)
1291             self._add_server(server_number=5)
1292         d.addCallback(_do_server_setup)
1293         # remove the original server
1294         # (necessary to ensure that the Tahoe2ServerSelector will distribute
1295         #  all the shares)
1296         def _remove_server(ign):
1297             server = self.g.servers_by_number[0]
1298             self.g.remove_server(server.my_nodeid)
1299         d.addCallback(_remove_server)
1300         # This should succeed; we still have 4 servers, and the
1301         # happiness of the upload is 4.
1302         d.addCallback(lambda ign:
1303             self._do_upload_with_broken_servers(1))
1304         # Now, do the same thing over again, but drop 2 servers instead
1305         # of 1. This should fail, because servers_of_happiness is 4 and
1306         # we can't satisfy that.
1307         d.addCallback(_set_basedir)
1308         d.addCallback(lambda ign:
1309             self._setup_and_upload())
1310         d.addCallback(_do_server_setup)
1311         d.addCallback(_remove_server)
1312         d.addCallback(lambda ign:
1313             self.shouldFail(UploadUnhappinessError,
1314                             "test_dropped_servers_in_encoder",
1315                             "shares could be placed on only 3 server(s) "
1316                             "such that any 3 of them have enough shares to "
1317                             "recover the file, but we were asked to place "
1318                             "shares on at least 4",
1319                             self._do_upload_with_broken_servers, 2))
1320         # Now do the same thing over again, but make some of the servers
1321         # readonly, break some of the ones that aren't, and make sure that
1322         # happiness accounting is preserved.
1323         d.addCallback(_set_basedir)
1324         d.addCallback(lambda ign:
1325             self._setup_and_upload())
1326         def _do_server_setup_2(ign):
1327             self._add_server(1)
1328             self._add_server(2)
1329             self._add_server(3)
1330             self._add_server_with_share(4, 7, readonly=True)
1331             self._add_server_with_share(5, 8, readonly=True)
1332         d.addCallback(_do_server_setup_2)
1333         d.addCallback(_remove_server)
1334         d.addCallback(lambda ign:
1335             self._do_upload_with_broken_servers(1))
1336         d.addCallback(_set_basedir)
1337         d.addCallback(lambda ign:
1338             self._setup_and_upload())
1339         d.addCallback(_do_server_setup_2)
1340         d.addCallback(_remove_server)
1341         d.addCallback(lambda ign:
1342             self.shouldFail(UploadUnhappinessError,
1343                             "test_dropped_servers_in_encoder",
1344                             "shares could be placed on only 3 server(s) "
1345                             "such that any 3 of them have enough shares to "
1346                             "recover the file, but we were asked to place "
1347                             "shares on at least 4",
1348                             self._do_upload_with_broken_servers, 2))
1349         return d
1350
1351
1352     def test_merge_servers(self):
1353         # merge_servers merges a list of upload_servers and a dict of
1354         # shareid -> serverid mappings.
1355         shares = {
1356                     1 : set(["server1"]),
1357                     2 : set(["server2"]),
1358                     3 : set(["server3"]),
1359                     4 : set(["server4", "server5"]),
1360                     5 : set(["server1", "server2"]),
1361                  }
1362         # if not provided with a upload_servers argument, it should just
1363         # return the first argument unchanged.
1364         self.failUnlessEqual(shares, merge_servers(shares, set([])))
1365         trackers = []
1366         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1367             t = FakeServerTracker(server, [i])
1368             trackers.append(t)
1369         expected = {
1370                     1 : set(["server1"]),
1371                     2 : set(["server2"]),
1372                     3 : set(["server3"]),
1373                     4 : set(["server4", "server5"]),
1374                     5 : set(["server1", "server2", "server5"]),
1375                     6 : set(["server6"]),
1376                     7 : set(["server7"]),
1377                     8 : set(["server8"]),
1378                    }
1379         self.failUnlessEqual(expected, merge_servers(shares, set(trackers)))
1380         shares2 = {}
1381         expected = {
1382                     5 : set(["server5"]),
1383                     6 : set(["server6"]),
1384                     7 : set(["server7"]),
1385                     8 : set(["server8"]),
1386                    }
1387         self.failUnlessEqual(expected, merge_servers(shares2, set(trackers)))
1388         shares3 = {}
1389         trackers = []
1390         expected = {}
1391         for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1392             shares3[i] = set([server])
1393             t = FakeServerTracker(server, [i])
1394             trackers.append(t)
1395             expected[i] = set([server])
1396         self.failUnlessEqual(expected, merge_servers(shares3, set(trackers)))
1397
1398
1399     def test_servers_of_happiness_utility_function(self):
1400         # These tests are concerned with the servers_of_happiness()
1401         # utility function, and its underlying matching algorithm. Other
1402         # aspects of the servers_of_happiness behavior are tested
1403         # elsehwere These tests exist to ensure that
1404         # servers_of_happiness doesn't under or overcount the happiness
1405         # value for given inputs.
1406
1407         # servers_of_happiness expects a dict of
1408         # shnum => set(serverids) as a preexisting shares argument.
1409         test1 = {
1410                  1 : set(["server1"]),
1411                  2 : set(["server2"]),
1412                  3 : set(["server3"]),
1413                  4 : set(["server4"])
1414                 }
1415         happy = servers_of_happiness(test1)
1416         self.failUnlessEqual(4, happy)
1417         test1[4] = set(["server1"])
1418         # We've added a duplicate server, so now servers_of_happiness
1419         # should be 3 instead of 4.
1420         happy = servers_of_happiness(test1)
1421         self.failUnlessEqual(3, happy)
1422         # The second argument of merge_servers should be a set of objects with
1423         # serverid and buckets as attributes. In actual use, these will be
1424         # ServerTracker instances, but for testing it is fine to make a
1425         # FakeServerTracker whose job is to hold those instance variables to
1426         # test that part.
1427         trackers = []
1428         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1429             t = FakeServerTracker(server, [i])
1430             trackers.append(t)
1431         # Recall that test1 is a server layout with servers_of_happiness
1432         # = 3.  Since there isn't any overlap between the shnum ->
1433         # set([serverid]) correspondences in test1 and those in trackers,
1434         # the result here should be 7.
1435         test2 = merge_servers(test1, set(trackers))
1436         happy = servers_of_happiness(test2)
1437         self.failUnlessEqual(7, happy)
1438         # Now add an overlapping server to trackers. This is redundant,
1439         # so it should not cause the previously reported happiness value
1440         # to change.
1441         t = FakeServerTracker("server1", [1])
1442         trackers.append(t)
1443         test2 = merge_servers(test1, set(trackers))
1444         happy = servers_of_happiness(test2)
1445         self.failUnlessEqual(7, happy)
1446         test = {}
1447         happy = servers_of_happiness(test)
1448         self.failUnlessEqual(0, happy)
1449         # Test a more substantial overlap between the trackers and the
1450         # existing assignments.
1451         test = {
1452             1 : set(['server1']),
1453             2 : set(['server2']),
1454             3 : set(['server3']),
1455             4 : set(['server4']),
1456         }
1457         trackers = []
1458         t = FakeServerTracker('server5', [4])
1459         trackers.append(t)
1460         t = FakeServerTracker('server6', [3, 5])
1461         trackers.append(t)
1462         # The value returned by servers_of_happiness is the size
1463         # of a maximum matching in the bipartite graph that
1464         # servers_of_happiness() makes between serverids and share
1465         # numbers. It should find something like this:
1466         # (server 1, share 1)
1467         # (server 2, share 2)
1468         # (server 3, share 3)
1469         # (server 5, share 4)
1470         # (server 6, share 5)
1471         #
1472         # and, since there are 5 edges in this matching, it should
1473         # return 5.
1474         test2 = merge_servers(test, set(trackers))
1475         happy = servers_of_happiness(test2)
1476         self.failUnlessEqual(5, happy)
1477         # Zooko's first puzzle:
1478         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1479         #
1480         # server 1: shares 0, 1
1481         # server 2: shares 1, 2
1482         # server 3: share 2
1483         #
1484         # This should yield happiness of 3.
1485         test = {
1486             0 : set(['server1']),
1487             1 : set(['server1', 'server2']),
1488             2 : set(['server2', 'server3']),
1489         }
1490         self.failUnlessEqual(3, servers_of_happiness(test))
1491         # Zooko's second puzzle:
1492         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1493         #
1494         # server 1: shares 0, 1
1495         # server 2: share 1
1496         #
1497         # This should yield happiness of 2.
1498         test = {
1499             0 : set(['server1']),
1500             1 : set(['server1', 'server2']),
1501         }
1502         self.failUnlessEqual(2, servers_of_happiness(test))
1503
1504
1505     def test_shares_by_server(self):
1506         test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1507         sbs = shares_by_server(test)
1508         self.failUnlessEqual(set([1]), sbs["server1"])
1509         self.failUnlessEqual(set([2]), sbs["server2"])
1510         self.failUnlessEqual(set([3]), sbs["server3"])
1511         self.failUnlessEqual(set([4]), sbs["server4"])
1512         test1 = {
1513                     1 : set(["server1"]),
1514                     2 : set(["server1"]),
1515                     3 : set(["server1"]),
1516                     4 : set(["server2"]),
1517                     5 : set(["server2"])
1518                 }
1519         sbs = shares_by_server(test1)
1520         self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1521         self.failUnlessEqual(set([4, 5]), sbs["server2"])
1522         # This should fail unless the serverid part of the mapping is a set
1523         test2 = {1: "server1"}
1524         self.shouldFail(AssertionError,
1525                        "test_shares_by_server",
1526                        "",
1527                        shares_by_server, test2)
1528
1529
1530     def test_existing_share_detection(self):
1531         self.basedir = self.mktemp()
1532         d = self._setup_and_upload()
1533         # Our final setup should look like this:
1534         # server 1: shares 0 - 9, read-only
1535         # server 2: empty
1536         # server 3: empty
1537         # server 4: empty
1538         # The purpose of this test is to make sure that the server selector
1539         # knows about the shares on server 1, even though it is read-only.
1540         # It used to simply filter these out, which would cause the test
1541         # to fail when servers_of_happiness = 4.
1542         d.addCallback(lambda ign:
1543             self._add_server_with_share(1, 0, True))
1544         d.addCallback(lambda ign:
1545             self._add_server(2))
1546         d.addCallback(lambda ign:
1547             self._add_server(3))
1548         d.addCallback(lambda ign:
1549             self._add_server(4))
1550         def _copy_shares(ign):
1551             for i in xrange(1, 10):
1552                 self._copy_share_to_server(i, 1)
1553         d.addCallback(_copy_shares)
1554         d.addCallback(lambda ign:
1555             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1556         def _prepare_client(ign):
1557             client = self.g.clients[0]
1558             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1559             return client
1560         d.addCallback(_prepare_client)
1561         d.addCallback(lambda client:
1562             client.upload(upload.Data("data" * 10000, convergence="")))
1563         d.addCallback(lambda ign:
1564             self.failUnless(self._has_happy_share_distribution()))
1565         return d
1566
1567
1568     def test_query_counting(self):
1569         # If server selection fails, Tahoe2ServerSelector prints out a lot
1570         # of helpful diagnostic information, including query stats.
1571         # This test helps make sure that that information is accurate.
1572         self.basedir = self.mktemp()
1573         d = self._setup_and_upload()
1574         def _setup(ign):
1575             for i in xrange(1, 11):
1576                 self._add_server(server_number=i)
1577             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1578             c = self.g.clients[0]
1579             # We set happy to an unsatisfiable value so that we can check the
1580             # counting in the exception message. The same progress message
1581             # is also used when the upload is successful, but in that case it
1582             # only gets written to a log, so we can't see what it says.
1583             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1584             return c
1585         d.addCallback(_setup)
1586         d.addCallback(lambda c:
1587             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1588                             "10 queries placed some shares",
1589                             c.upload, upload.Data("data" * 10000,
1590                                                   convergence="")))
1591         # Now try with some readonly servers. We want to make sure that
1592         # the readonly server share discovery phase is counted correctly.
1593         def _reset(ign):
1594             self.basedir = self.mktemp()
1595             self.g = None
1596         d.addCallback(_reset)
1597         d.addCallback(lambda ign:
1598             self._setup_and_upload())
1599         def _then(ign):
1600             for i in xrange(1, 11):
1601                 self._add_server(server_number=i)
1602             self._add_server(server_number=11, readonly=True)
1603             self._add_server(server_number=12, readonly=True)
1604             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1605             c = self.g.clients[0]
1606             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1607             return c
1608         d.addCallback(_then)
1609         d.addCallback(lambda c:
1610             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1611                             "2 placed none (of which 2 placed none due to "
1612                             "the server being full",
1613                             c.upload, upload.Data("data" * 10000,
1614                                                   convergence="")))
1615         # Now try the case where the upload process finds a bunch of the
1616         # shares that it wants to place on the first server, including
1617         # the one that it wanted to allocate there. Though no shares will
1618         # be allocated in this request, it should still be called
1619         # productive, since it caused some homeless shares to be
1620         # removed.
1621         d.addCallback(_reset)
1622         d.addCallback(lambda ign:
1623             self._setup_and_upload())
1624
1625         def _next(ign):
1626             for i in xrange(1, 11):
1627                 self._add_server(server_number=i)
1628             # Copy all of the shares to server 9, since that will be
1629             # the first one that the selector sees.
1630             for i in xrange(10):
1631                 self._copy_share_to_server(i, 9)
1632             # Remove server 0, and its contents
1633             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1634             # Make happiness unsatisfiable
1635             c = self.g.clients[0]
1636             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1637             return c
1638         d.addCallback(_next)
1639         d.addCallback(lambda c:
1640             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1641                             "1 queries placed some shares",
1642                             c.upload, upload.Data("data" * 10000,
1643                                                   convergence="")))
1644         return d
1645
1646
1647     def test_upper_limit_on_readonly_queries(self):
1648         self.basedir = self.mktemp()
1649         d = self._setup_and_upload()
1650         def _then(ign):
1651             for i in xrange(1, 11):
1652                 self._add_server(server_number=i, readonly=True)
1653             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1654             c = self.g.clients[0]
1655             c.DEFAULT_ENCODING_PARAMETERS['k'] = 2
1656             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1657             c.DEFAULT_ENCODING_PARAMETERS['n'] = 4
1658             return c
1659         d.addCallback(_then)
1660         d.addCallback(lambda client:
1661             self.shouldFail(UploadUnhappinessError,
1662                             "test_upper_limit_on_readonly_queries",
1663                             "sent 8 queries to 8 servers",
1664                             client.upload,
1665                             upload.Data('data' * 10000, convergence="")))
1666         return d
1667
1668
1669     def test_exception_messages_during_server_selection(self):
1670         # server 1: read-only, no shares
1671         # server 2: read-only, no shares
1672         # server 3: read-only, no shares
1673         # server 4: read-only, no shares
1674         # server 5: read-only, no shares
1675         # This will fail, but we want to make sure that the log messages
1676         # are informative about why it has failed.
1677         self.basedir = self.mktemp()
1678         d = self._setup_and_upload()
1679         d.addCallback(lambda ign:
1680             self._add_server(server_number=1, readonly=True))
1681         d.addCallback(lambda ign:
1682             self._add_server(server_number=2, readonly=True))
1683         d.addCallback(lambda ign:
1684             self._add_server(server_number=3, readonly=True))
1685         d.addCallback(lambda ign:
1686             self._add_server(server_number=4, readonly=True))
1687         d.addCallback(lambda ign:
1688             self._add_server(server_number=5, readonly=True))
1689         d.addCallback(lambda ign:
1690             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1691         def _reset_encoding_parameters(ign, happy=4):
1692             client = self.g.clients[0]
1693             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1694             return client
1695         d.addCallback(_reset_encoding_parameters)
1696         d.addCallback(lambda client:
1697             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1698                             "placed 0 shares out of 10 "
1699                             "total (10 homeless), want to place shares on at "
1700                             "least 4 servers such that any 3 of them have "
1701                             "enough shares to recover the file, "
1702                             "sent 5 queries to 5 servers, 0 queries placed "
1703                             "some shares, 5 placed none "
1704                             "(of which 5 placed none due to the server being "
1705                             "full and 0 placed none due to an error)",
1706                             client.upload,
1707                             upload.Data("data" * 10000, convergence="")))
1708
1709
1710         # server 1: read-only, no shares
1711         # server 2: broken, no shares
1712         # server 3: read-only, no shares
1713         # server 4: read-only, no shares
1714         # server 5: read-only, no shares
1715         def _reset(ign):
1716             self.basedir = self.mktemp()
1717         d.addCallback(_reset)
1718         d.addCallback(lambda ign:
1719             self._setup_and_upload())
1720         d.addCallback(lambda ign:
1721             self._add_server(server_number=1, readonly=True))
1722         d.addCallback(lambda ign:
1723             self._add_server(server_number=2))
1724         def _break_server_2(ign):
1725             serverid = self.g.servers_by_number[2].my_nodeid
1726             self.g.break_server(serverid)
1727         d.addCallback(_break_server_2)
1728         d.addCallback(lambda ign:
1729             self._add_server(server_number=3, readonly=True))
1730         d.addCallback(lambda ign:
1731             self._add_server(server_number=4, readonly=True))
1732         d.addCallback(lambda ign:
1733             self._add_server(server_number=5, readonly=True))
1734         d.addCallback(lambda ign:
1735             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1736         d.addCallback(_reset_encoding_parameters)
1737         d.addCallback(lambda client:
1738             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1739                             "placed 0 shares out of 10 "
1740                             "total (10 homeless), want to place shares on at "
1741                             "least 4 servers such that any 3 of them have "
1742                             "enough shares to recover the file, "
1743                             "sent 5 queries to 5 servers, 0 queries placed "
1744                             "some shares, 5 placed none "
1745                             "(of which 4 placed none due to the server being "
1746                             "full and 1 placed none due to an error)",
1747                             client.upload,
1748                             upload.Data("data" * 10000, convergence="")))
1749         # server 0, server 1 = empty, accepting shares
1750         # This should place all of the shares, but still fail with happy=4.
1751         # We want to make sure that the exception message is worded correctly.
1752         d.addCallback(_reset)
1753         d.addCallback(lambda ign:
1754             self._setup_grid())
1755         d.addCallback(lambda ign:
1756             self._add_server(server_number=1))
1757         d.addCallback(_reset_encoding_parameters)
1758         d.addCallback(lambda client:
1759             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1760                             "shares could be placed or found on only 2 "
1761                             "server(s). We were asked to place shares on at "
1762                             "least 4 server(s) such that any 3 of them have "
1763                             "enough shares to recover the file.",
1764                             client.upload, upload.Data("data" * 10000,
1765                                                        convergence="")))
1766         # servers 0 - 4 = empty, accepting shares
1767         # This too should place all the shares, and this too should fail,
1768         # but since the effective happiness is more than the k encoding
1769         # parameter, it should trigger a different error message than the one
1770         # above.
1771         d.addCallback(_reset)
1772         d.addCallback(lambda ign:
1773             self._setup_grid())
1774         d.addCallback(lambda ign:
1775             self._add_server(server_number=1))
1776         d.addCallback(lambda ign:
1777             self._add_server(server_number=2))
1778         d.addCallback(lambda ign:
1779             self._add_server(server_number=3))
1780         d.addCallback(lambda ign:
1781             self._add_server(server_number=4))
1782         d.addCallback(_reset_encoding_parameters, happy=7)
1783         d.addCallback(lambda client:
1784             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1785                             "shares could be placed on only 5 server(s) such "
1786                             "that any 3 of them have enough shares to recover "
1787                             "the file, but we were asked to place shares on "
1788                             "at least 7 such servers.",
1789                             client.upload, upload.Data("data" * 10000,
1790                                                        convergence="")))
1791         # server 0: shares 0 - 9
1792         # server 1: share 0, read-only
1793         # server 2: share 0, read-only
1794         # server 3: share 0, read-only
1795         # This should place all of the shares, but fail with happy=4.
1796         # Since the number of servers with shares is more than the number
1797         # necessary to reconstitute the file, this will trigger a different
1798         # error message than either of those above.
1799         d.addCallback(_reset)
1800         d.addCallback(lambda ign:
1801             self._setup_and_upload())
1802         d.addCallback(lambda ign:
1803             self._add_server_with_share(server_number=1, share_number=0,
1804                                         readonly=True))
1805         d.addCallback(lambda ign:
1806             self._add_server_with_share(server_number=2, share_number=0,
1807                                         readonly=True))
1808         d.addCallback(lambda ign:
1809             self._add_server_with_share(server_number=3, share_number=0,
1810                                         readonly=True))
1811         d.addCallback(_reset_encoding_parameters, happy=7)
1812         d.addCallback(lambda client:
1813             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1814                             "shares could be placed or found on 4 server(s), "
1815                             "but they are not spread out evenly enough to "
1816                             "ensure that any 3 of these servers would have "
1817                             "enough shares to recover the file. We were asked "
1818                             "to place shares on at least 7 servers such that "
1819                             "any 3 of them have enough shares to recover the "
1820                             "file",
1821                             client.upload, upload.Data("data" * 10000,
1822                                                        convergence="")))
1823         return d
1824
1825
1826     def test_problem_layout_comment_187(self):
1827         # #778 comment 187 broke an initial attempt at a share
1828         # redistribution algorithm. This test is here to demonstrate the
1829         # breakage, and to test that subsequent algorithms don't also
1830         # break in the same way.
1831         self.basedir = self.mktemp()
1832         d = self._setup_and_upload(k=2, n=3)
1833
1834         # server 1: shares 0, 1, 2, readonly
1835         # server 2: share 0, readonly
1836         # server 3: share 0
1837         def _setup(ign):
1838             self._add_server_with_share(server_number=1, share_number=0,
1839                                         readonly=True)
1840             self._add_server_with_share(server_number=2, share_number=0,
1841                                         readonly=True)
1842             self._add_server_with_share(server_number=3, share_number=0)
1843             # Copy shares
1844             self._copy_share_to_server(1, 1)
1845             self._copy_share_to_server(2, 1)
1846             # Remove server 0
1847             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1848             client = self.g.clients[0]
1849             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
1850             return client
1851
1852         d.addCallback(_setup)
1853         d.addCallback(lambda client:
1854             client.upload(upload.Data("data" * 10000, convergence="")))
1855         d.addCallback(lambda ign:
1856             self.failUnless(self._has_happy_share_distribution()))
1857         return d
1858     test_problem_layout_comment_187.todo = "this isn't fixed yet"
1859
1860     def test_problem_layout_ticket_1118(self):
1861         # #1118 includes a report from a user who hit an assertion in
1862         # the upload code with this layout.
1863         self.basedir = self.mktemp()
1864         d = self._setup_and_upload(k=2, n=4)
1865
1866         # server 0: no shares
1867         # server 1: shares 0, 3
1868         # server 3: share 1
1869         # server 2: share 2
1870         # The order that they get queries is 0, 1, 3, 2
1871         def _setup(ign):
1872             self._add_server(server_number=0)
1873             self._add_server_with_share(server_number=1, share_number=0)
1874             self._add_server_with_share(server_number=2, share_number=2)
1875             self._add_server_with_share(server_number=3, share_number=1)
1876             # Copy shares
1877             self._copy_share_to_server(3, 1)
1878             storedir = self.get_serverdir(0)
1879             # remove the storedir, wiping out any existing shares
1880             shutil.rmtree(storedir)
1881             # create an empty storedir to replace the one we just removed
1882             os.mkdir(storedir)
1883             client = self.g.clients[0]
1884             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1885             return client
1886
1887         d.addCallback(_setup)
1888         # Note: actually it should succeed! See
1889         # test_problem_layout_ticket_1128. But ticket 1118 is just to
1890         # make it realize that it has failed, so if it raises
1891         # UploadUnhappinessError then we'll give it the green light
1892         # for now.
1893         d.addCallback(lambda ignored:
1894             self.shouldFail(UploadUnhappinessError,
1895                             "test_problem_layout_ticket_1118",
1896                             "",
1897                             self.g.clients[0].upload, upload.Data("data" * 10000,
1898                                                        convergence="")))
1899         return d
1900
1901     def test_problem_layout_ticket_1128(self):
1902         # #1118 includes a report from a user who hit an assertion in
1903         # the upload code with this layout.
1904         self.basedir = self.mktemp()
1905         d = self._setup_and_upload(k=2, n=4)
1906
1907         # server 0: no shares
1908         # server 1: shares 0, 3
1909         # server 3: share 1
1910         # server 2: share 2
1911         # The order that they get queries is 0, 1, 3, 2
1912         def _setup(ign):
1913             self._add_server(server_number=0)
1914             self._add_server_with_share(server_number=1, share_number=0)
1915             self._add_server_with_share(server_number=2, share_number=2)
1916             self._add_server_with_share(server_number=3, share_number=1)
1917             # Copy shares
1918             self._copy_share_to_server(3, 1)
1919             storedir = self.get_serverdir(0)
1920             # remove the storedir, wiping out any existing shares
1921             shutil.rmtree(storedir)
1922             # create an empty storedir to replace the one we just removed
1923             os.mkdir(storedir)
1924             client = self.g.clients[0]
1925             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1926             return client
1927
1928         d.addCallback(_setup)
1929         d.addCallback(lambda client:
1930                           client.upload(upload.Data("data" * 10000, convergence="")))
1931         d.addCallback(lambda ign:
1932             self.failUnless(self._has_happy_share_distribution()))
1933         return d
1934     test_problem_layout_ticket_1128.todo = "Invent a smarter uploader that uploads successfully in this case."
1935
1936     def test_upload_succeeds_with_some_homeless_shares(self):
1937         # If the upload is forced to stop trying to place shares before
1938         # it has placed (or otherwise accounted) for all of them, but it
1939         # has placed enough to satisfy the upload health criteria that
1940         # we're using, it should still succeed.
1941         self.basedir = self.mktemp()
1942         d = self._setup_and_upload()
1943         def _server_setup(ign):
1944             # Add four servers so that we have a layout like this:
1945             # server 1: share 0, read-only
1946             # server 2: share 1, read-only
1947             # server 3: share 2, read-only
1948             # server 4: share 3, read-only
1949             # If we set happy = 4, the upload will manage to satisfy
1950             # servers of happiness, but not place all of the shares; we
1951             # want to test that the upload is declared successful in
1952             # this case.
1953             self._add_server_with_share(server_number=1, share_number=0,
1954                                         readonly=True)
1955             self._add_server_with_share(server_number=2, share_number=1,
1956                                         readonly=True)
1957             self._add_server_with_share(server_number=3, share_number=2,
1958                                         readonly=True)
1959             self._add_server_with_share(server_number=4, share_number=3,
1960                                         readonly=True)
1961             # Remove server 0.
1962             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1963             # Set the client appropriately
1964             c = self.g.clients[0]
1965             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1966             return c
1967         d.addCallback(_server_setup)
1968         d.addCallback(lambda client:
1969             client.upload(upload.Data("data" * 10000, convergence="")))
1970         d.addCallback(lambda ign:
1971             self.failUnless(self._has_happy_share_distribution()))
1972         return d
1973
1974
1975     def test_uploader_skips_over_servers_with_only_one_share(self):
1976         # We want to make sure that the redistribution logic ignores
1977         # servers with only one share, since placing these shares
1978         # elsewhere will at best keep happiness the same as it was, and
1979         # at worst hurt it.
1980         self.basedir = self.mktemp()
1981         d = self._setup_and_upload()
1982         def _server_setup(ign):
1983             # Add some servers so that the upload will need to
1984             # redistribute, but will first pass over a couple of servers
1985             # that don't have enough shares to redistribute before
1986             # finding one that does have shares to redistribute.
1987             self._add_server_with_share(server_number=1, share_number=0)
1988             self._add_server_with_share(server_number=2, share_number=2)
1989             self._add_server_with_share(server_number=3, share_number=1)
1990             self._add_server_with_share(server_number=8, share_number=4)
1991             self._add_server_with_share(server_number=5, share_number=5)
1992             self._add_server_with_share(server_number=10, share_number=7)
1993             for i in xrange(4):
1994                 self._copy_share_to_server(i, 2)
1995             return self.g.clients[0]
1996         d.addCallback(_server_setup)
1997         d.addCallback(lambda client:
1998             client.upload(upload.Data("data" * 10000, convergence="")))
1999         d.addCallback(lambda ign:
2000             self.failUnless(self._has_happy_share_distribution()))
2001         return d
2002
2003
2004     def test_server_selector_bucket_abort(self):
2005         # If server selection for an upload fails due to an unhappy
2006         # layout, the server selection process should abort the buckets it
2007         # allocates before failing, so that the space can be re-used.
2008         self.basedir = self.mktemp()
2009         self.set_up_grid(num_servers=5)
2010
2011         # Try to upload a file with happy=7, which is unsatisfiable with
2012         # the current grid. This will fail, but should not take up any
2013         # space on the storage servers after it fails.
2014         client = self.g.clients[0]
2015         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2016         d = defer.succeed(None)
2017         d.addCallback(lambda ignored:
2018             self.shouldFail(UploadUnhappinessError,
2019                             "test_server_selection_bucket_abort",
2020                             "",
2021                             client.upload, upload.Data("data" * 10000,
2022                                                        convergence="")))
2023         # wait for the abort messages to get there.
2024         def _turn_barrier(res):
2025             return fireEventually(res)
2026         d.addCallback(_turn_barrier)
2027         def _then(ignored):
2028             for server in self.g.servers_by_number.values():
2029                 self.failUnlessEqual(server.allocated_size(), 0)
2030         d.addCallback(_then)
2031         return d
2032
2033
2034     def test_encoder_bucket_abort(self):
2035         # If enough servers die in the process of encoding and uploading
2036         # a file to make the layout unhappy, we should cancel the
2037         # newly-allocated buckets before dying.
2038         self.basedir = self.mktemp()
2039         self.set_up_grid(num_servers=4)
2040
2041         client = self.g.clients[0]
2042         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2043
2044         d = defer.succeed(None)
2045         d.addCallback(lambda ignored:
2046             self.shouldFail(UploadUnhappinessError,
2047                             "test_encoder_bucket_abort",
2048                             "",
2049                             self._do_upload_with_broken_servers, 1))
2050         def _turn_barrier(res):
2051             return fireEventually(res)
2052         d.addCallback(_turn_barrier)
2053         def _then(ignored):
2054             for server in self.g.servers_by_number.values():
2055                 self.failUnlessEqual(server.allocated_size(), 0)
2056         d.addCallback(_then)
2057         return d
2058
2059
2060     def _set_up_nodes_extra_config(self, clientdir):
2061         cfgfn = os.path.join(clientdir, "tahoe.cfg")
2062         oldcfg = open(cfgfn, "r").read()
2063         f = open(cfgfn, "wt")
2064         f.write(oldcfg)
2065         f.write("\n")
2066         f.write("[client]\n")
2067         f.write("shares.needed = 7\n")
2068         f.write("shares.total = 12\n")
2069         f.write("\n")
2070         f.close()
2071         return None
2072
2073 # TODO:
2074 #  upload with exactly 75 servers (shares_of_happiness)
2075 #  have a download fail
2076 #  cancel a download (need to implement more cancel stuff)
2077
2078 # from test_encode:
2079 # NoNetworkGrid, upload part of ciphertext, kill server, continue upload
2080 # check with Kevan, they want to live in test_upload, existing tests might cover
2081 #     def test_lost_one_shareholder(self): # these are upload-side tests
2082 #     def test_lost_one_shareholder_early(self):
2083 #     def test_lost_many_shareholders(self):
2084 #     def test_lost_all_shareholders(self):