]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
new introducer: signed extensible dictionary-based messages! refs #466
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1 # -*- coding: utf-8 -*-
2
3 import os, shutil
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.python.failure import Failure
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
9
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
14 from allmydata.util import log, base32
15 from allmydata.util.assertutil import precondition
16 from allmydata.util.deferredutil import DeferredListShouldSucceed
17 from allmydata.test.no_network import GridTestMixin
18 from allmydata.test.common_util import ShouldFailMixin
19 from allmydata.util.happinessutil import servers_of_happiness, \
20                                          shares_by_server, merge_servers
21 from allmydata.storage_client import StorageFarmBroker
22 from allmydata.storage.server import storage_index_to_dir
23
24 MiB = 1024*1024
25
26 def extract_uri(results):
27     return results.uri
28
29 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
30 # have been due to an earlier test ERROR'ing out due to timeout, which seems
31 # to screw up subsequent tests.
32 timeout = 960
33
34 class Uploadable(unittest.TestCase):
35     def shouldEqual(self, data, expected):
36         self.failUnless(isinstance(data, list))
37         for e in data:
38             self.failUnless(isinstance(e, str))
39         s = "".join(data)
40         self.failUnlessEqual(s, expected)
41
42     def test_filehandle_random_key(self):
43         return self._test_filehandle(convergence=None)
44
45     def test_filehandle_convergent_encryption(self):
46         return self._test_filehandle(convergence="some convergence string")
47
48     def _test_filehandle(self, convergence):
49         s = StringIO("a"*41)
50         u = upload.FileHandle(s, convergence=convergence)
51         d = u.get_size()
52         d.addCallback(self.failUnlessEqual, 41)
53         d.addCallback(lambda res: u.read(1))
54         d.addCallback(self.shouldEqual, "a")
55         d.addCallback(lambda res: u.read(80))
56         d.addCallback(self.shouldEqual, "a"*40)
57         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
58         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
59         return d
60
61     def test_filename(self):
62         basedir = "upload/Uploadable/test_filename"
63         os.makedirs(basedir)
64         fn = os.path.join(basedir, "file")
65         f = open(fn, "w")
66         f.write("a"*41)
67         f.close()
68         u = upload.FileName(fn, convergence=None)
69         d = u.get_size()
70         d.addCallback(self.failUnlessEqual, 41)
71         d.addCallback(lambda res: u.read(1))
72         d.addCallback(self.shouldEqual, "a")
73         d.addCallback(lambda res: u.read(80))
74         d.addCallback(self.shouldEqual, "a"*40)
75         d.addCallback(lambda res: u.close())
76         return d
77
78     def test_data(self):
79         s = "a"*41
80         u = upload.Data(s, convergence=None)
81         d = u.get_size()
82         d.addCallback(self.failUnlessEqual, 41)
83         d.addCallback(lambda res: u.read(1))
84         d.addCallback(self.shouldEqual, "a")
85         d.addCallback(lambda res: u.read(80))
86         d.addCallback(self.shouldEqual, "a"*40)
87         d.addCallback(lambda res: u.close())
88         return d
89
90 class ServerError(Exception):
91     pass
92
93 class SetDEPMixin:
94     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
95         p = {"k": k,
96              "happy": happy,
97              "n": n,
98              "max_segment_size": max_segsize,
99              }
100         self.node.DEFAULT_ENCODING_PARAMETERS = p
101
102 class FakeStorageServer:
103     def __init__(self, mode):
104         self.mode = mode
105         self.allocated = []
106         self.queries = 0
107         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
108                          { "maximum-immutable-share-size": 2**32 },
109                          "application-version": str(allmydata.__full_version__),
110                          }
111         if mode == "small":
112             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
113                              { "maximum-immutable-share-size": 10 },
114                              "application-version": str(allmydata.__full_version__),
115                              }
116
117
118     def callRemote(self, methname, *args, **kwargs):
119         def _call():
120             meth = getattr(self, methname)
121             return meth(*args, **kwargs)
122         d = fireEventually()
123         d.addCallback(lambda res: _call())
124         return d
125
126     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
127                          sharenums, share_size, canary):
128         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
129         if self.mode == "first-fail":
130             if self.queries == 0:
131                 raise ServerError
132         if self.mode == "second-fail":
133             if self.queries == 1:
134                 raise ServerError
135         self.queries += 1
136         if self.mode == "full":
137             return (set(), {},)
138         elif self.mode == "already got them":
139             return (set(sharenums), {},)
140         else:
141             for shnum in sharenums:
142                 self.allocated.append( (storage_index, shnum) )
143             return (set(),
144                     dict([( shnum, FakeBucketWriter(share_size) )
145                           for shnum in sharenums]),
146                     )
147
148 class FakeBucketWriter:
149     # a diagnostic version of storageserver.BucketWriter
150     def __init__(self, size):
151         self.data = StringIO()
152         self.closed = False
153         self._size = size
154
155     def callRemote(self, methname, *args, **kwargs):
156         def _call():
157             meth = getattr(self, "remote_" + methname)
158             return meth(*args, **kwargs)
159         d = fireEventually()
160         d.addCallback(lambda res: _call())
161         return d
162
163
164     def callRemoteOnly(self, methname, *args, **kwargs):
165         d = self.callRemote(methname, *args, **kwargs)
166         del d # callRemoteOnly ignores this
167         return None
168
169
170     def remote_write(self, offset, data):
171         precondition(not self.closed)
172         precondition(offset >= 0)
173         precondition(offset+len(data) <= self._size,
174                      "offset=%d + data=%d > size=%d" %
175                      (offset, len(data), self._size))
176         self.data.seek(offset)
177         self.data.write(data)
178
179     def remote_close(self):
180         precondition(not self.closed)
181         self.closed = True
182
183     def remote_abort(self):
184         pass
185
186 class FakeClient:
187     DEFAULT_ENCODING_PARAMETERS = {"k":25,
188                                    "happy": 25,
189                                    "n": 100,
190                                    "max_segment_size": 1*MiB,
191                                    }
192     def __init__(self, mode="good", num_servers=50):
193         self.num_servers = num_servers
194         if type(mode) is str:
195             mode = dict([i,mode] for i in range(num_servers))
196         servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
197                     for fakeid in range(self.num_servers) ]
198         self.storage_broker = StorageFarmBroker(None, permute_peers=True)
199         for (serverid, rref) in servers:
200             ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
201                    "permutation-seed-base32": base32.b2a(serverid) }
202             self.storage_broker.test_add_rref(serverid, rref, ann)
203         self.last_servers = [s[1] for s in servers]
204
205     def log(self, *args, **kwargs):
206         pass
207     def get_encoding_parameters(self):
208         return self.DEFAULT_ENCODING_PARAMETERS
209     def get_storage_broker(self):
210         return self.storage_broker
211     _secret_holder = client.SecretHolder("lease secret", "convergence secret")
212
213 class GotTooFarError(Exception):
214     pass
215
216 class GiganticUploadable(upload.FileHandle):
217     def __init__(self, size):
218         self._size = size
219         self._fp = 0
220
221     def get_encryption_key(self):
222         return defer.succeed("\x00" * 16)
223     def get_size(self):
224         return defer.succeed(self._size)
225     def read(self, length):
226         left = self._size - self._fp
227         length = min(left, length)
228         self._fp += length
229         if self._fp > 1000000:
230             # terminate the test early.
231             raise GotTooFarError("we shouldn't be allowed to get this far")
232         return defer.succeed(["\x00" * length])
233     def close(self):
234         pass
235
236 DATA = """
237 Once upon a time, there was a beautiful princess named Buttercup. She lived
238 in a magical land where every file was stored securely among millions of
239 machines, and nobody ever worried about their data being lost ever again.
240 The End.
241 """
242 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
243
244 SIZE_ZERO = 0
245 SIZE_SMALL = 16
246 SIZE_LARGE = len(DATA)
247
248 def upload_data(uploader, data):
249     u = upload.Data(data, convergence=None)
250     return uploader.upload(u)
251 def upload_filename(uploader, filename):
252     u = upload.FileName(filename, convergence=None)
253     return uploader.upload(u)
254 def upload_filehandle(uploader, fh):
255     u = upload.FileHandle(fh, convergence=None)
256     return uploader.upload(u)
257
258 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
259     def setUp(self):
260         self.node = FakeClient(mode="good")
261         self.u = upload.Uploader()
262         self.u.running = True
263         self.u.parent = self.node
264
265     def _check_small(self, newuri, size):
266         u = uri.from_string(newuri)
267         self.failUnless(isinstance(u, uri.LiteralFileURI))
268         self.failUnlessEqual(len(u.data), size)
269
270     def _check_large(self, newuri, size):
271         u = uri.from_string(newuri)
272         self.failUnless(isinstance(u, uri.CHKFileURI))
273         self.failUnless(isinstance(u.get_storage_index(), str))
274         self.failUnlessEqual(len(u.get_storage_index()), 16)
275         self.failUnless(isinstance(u.key, str))
276         self.failUnlessEqual(len(u.key), 16)
277         self.failUnlessEqual(u.size, size)
278
279     def get_data(self, size):
280         return DATA[:size]
281
282     def test_too_large(self):
283         # we've removed the 4GiB share size limit (see ticket #346 for
284         # details), but still have an 8-byte field, so the limit is now
285         # 2**64, so make sure we reject files larger than that.
286         k = 3; happy = 7; n = 10
287         self.set_encoding_parameters(k, happy, n)
288         big = k*(2**64)
289         data1 = GiganticUploadable(big)
290         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
291                             "This file is too large to be uploaded (data_size)",
292                             self.u.upload, data1)
293         data2 = GiganticUploadable(big-3)
294         d.addCallback(lambda res:
295                       self.shouldFail(FileTooLargeError,
296                                       "test_too_large-data2",
297                                       "This file is too large to be uploaded (offsets)",
298                                       self.u.upload, data2))
299         # I don't know where the actual limit is.. it depends upon how large
300         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
301         return d
302
303     def test_data_zero(self):
304         data = self.get_data(SIZE_ZERO)
305         d = upload_data(self.u, data)
306         d.addCallback(extract_uri)
307         d.addCallback(self._check_small, SIZE_ZERO)
308         return d
309
310     def test_data_small(self):
311         data = self.get_data(SIZE_SMALL)
312         d = upload_data(self.u, data)
313         d.addCallback(extract_uri)
314         d.addCallback(self._check_small, SIZE_SMALL)
315         return d
316
317     def test_data_large(self):
318         data = self.get_data(SIZE_LARGE)
319         d = upload_data(self.u, data)
320         d.addCallback(extract_uri)
321         d.addCallback(self._check_large, SIZE_LARGE)
322         return d
323
324     def test_data_large_odd_segments(self):
325         data = self.get_data(SIZE_LARGE)
326         segsize = int(SIZE_LARGE / 2.5)
327         # we want 3 segments, since that's not a power of two
328         self.set_encoding_parameters(25, 25, 100, segsize)
329         d = upload_data(self.u, data)
330         d.addCallback(extract_uri)
331         d.addCallback(self._check_large, SIZE_LARGE)
332         return d
333
334     def test_filehandle_zero(self):
335         data = self.get_data(SIZE_ZERO)
336         d = upload_filehandle(self.u, StringIO(data))
337         d.addCallback(extract_uri)
338         d.addCallback(self._check_small, SIZE_ZERO)
339         return d
340
341     def test_filehandle_small(self):
342         data = self.get_data(SIZE_SMALL)
343         d = upload_filehandle(self.u, StringIO(data))
344         d.addCallback(extract_uri)
345         d.addCallback(self._check_small, SIZE_SMALL)
346         return d
347
348     def test_filehandle_large(self):
349         data = self.get_data(SIZE_LARGE)
350         d = upload_filehandle(self.u, StringIO(data))
351         d.addCallback(extract_uri)
352         d.addCallback(self._check_large, SIZE_LARGE)
353         return d
354
355     def test_filename_zero(self):
356         fn = "Uploader-test_filename_zero.data"
357         f = open(fn, "wb")
358         data = self.get_data(SIZE_ZERO)
359         f.write(data)
360         f.close()
361         d = upload_filename(self.u, fn)
362         d.addCallback(extract_uri)
363         d.addCallback(self._check_small, SIZE_ZERO)
364         return d
365
366     def test_filename_small(self):
367         fn = "Uploader-test_filename_small.data"
368         f = open(fn, "wb")
369         data = self.get_data(SIZE_SMALL)
370         f.write(data)
371         f.close()
372         d = upload_filename(self.u, fn)
373         d.addCallback(extract_uri)
374         d.addCallback(self._check_small, SIZE_SMALL)
375         return d
376
377     def test_filename_large(self):
378         fn = "Uploader-test_filename_large.data"
379         f = open(fn, "wb")
380         data = self.get_data(SIZE_LARGE)
381         f.write(data)
382         f.close()
383         d = upload_filename(self.u, fn)
384         d.addCallback(extract_uri)
385         d.addCallback(self._check_large, SIZE_LARGE)
386         return d
387
388 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
389     def make_node(self, mode, num_servers=10):
390         self.node = FakeClient(mode, num_servers)
391         self.u = upload.Uploader()
392         self.u.running = True
393         self.u.parent = self.node
394
395     def _check_large(self, newuri, size):
396         u = uri.from_string(newuri)
397         self.failUnless(isinstance(u, uri.CHKFileURI))
398         self.failUnless(isinstance(u.get_storage_index(), str))
399         self.failUnlessEqual(len(u.get_storage_index()), 16)
400         self.failUnless(isinstance(u.key, str))
401         self.failUnlessEqual(len(u.key), 16)
402         self.failUnlessEqual(u.size, size)
403
404     def test_first_error(self):
405         mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
406         self.make_node(mode)
407         self.set_encoding_parameters(k=25, happy=1, n=50)
408         d = upload_data(self.u, DATA)
409         d.addCallback(extract_uri)
410         d.addCallback(self._check_large, SIZE_LARGE)
411         return d
412
413     def test_first_error_all(self):
414         self.make_node("first-fail")
415         d = self.shouldFail(UploadUnhappinessError, "first_error_all",
416                             "server selection failed",
417                             upload_data, self.u, DATA)
418         def _check((f,)):
419             self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
420             # there should also be a 'last failure was' message
421             self.failUnlessIn("ServerError", str(f.value))
422         d.addCallback(_check)
423         return d
424
425     def test_second_error(self):
426         # we want to make sure we make it to a third pass. This means that
427         # the first pass was insufficient to place all shares, and at least
428         # one of second pass servers (other than the last one) accepted a
429         # share (so we'll believe that a third pass will be useful). (if
430         # everyone but the last server throws an error, then we'll send all
431         # the remaining shares to the last server at the end of the second
432         # pass, and if that succeeds, we won't make it to a third pass).
433         #
434         # we can achieve this 97.5% of the time by using 40 servers, having
435         # 39 of them fail on the second request, leaving only one to succeed
436         # on the second request. (we need to keep the number of servers low
437         # enough to ensure a second pass with 100 shares).
438         mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
439         self.make_node(mode, 40)
440         d = upload_data(self.u, DATA)
441         d.addCallback(extract_uri)
442         d.addCallback(self._check_large, SIZE_LARGE)
443         return d
444
445     def test_second_error_all(self):
446         self.make_node("second-fail")
447         d = self.shouldFail(UploadUnhappinessError, "second_error_all",
448                             "server selection failed",
449                             upload_data, self.u, DATA)
450         def _check((f,)):
451             self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
452             # there should also be a 'last failure was' message
453             self.failUnlessIn("ServerError", str(f.value))
454         d.addCallback(_check)
455         return d
456
457 class FullServer(unittest.TestCase):
458     def setUp(self):
459         self.node = FakeClient(mode="full")
460         self.u = upload.Uploader()
461         self.u.running = True
462         self.u.parent = self.node
463
464     def _should_fail(self, f):
465         self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
466
467     def test_data_large(self):
468         data = DATA
469         d = upload_data(self.u, data)
470         d.addBoth(self._should_fail)
471         return d
472
473 class ServerSelection(unittest.TestCase):
474
475     def make_client(self, num_servers=50):
476         self.node = FakeClient(mode="good", num_servers=num_servers)
477         self.u = upload.Uploader()
478         self.u.running = True
479         self.u.parent = self.node
480
481     def get_data(self, size):
482         return DATA[:size]
483
484     def _check_large(self, newuri, size):
485         u = uri.from_string(newuri)
486         self.failUnless(isinstance(u, uri.CHKFileURI))
487         self.failUnless(isinstance(u.get_storage_index(), str))
488         self.failUnlessEqual(len(u.get_storage_index()), 16)
489         self.failUnless(isinstance(u.key, str))
490         self.failUnlessEqual(len(u.key), 16)
491         self.failUnlessEqual(u.size, size)
492
493     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
494         p = {"k": k,
495              "happy": happy,
496              "n": n,
497              "max_segment_size": max_segsize,
498              }
499         self.node.DEFAULT_ENCODING_PARAMETERS = p
500
501     def test_one_each(self):
502         # if we have 50 shares, and there are 50 servers, and they all accept
503         # a share, we should get exactly one share per server
504
505         self.make_client()
506         data = self.get_data(SIZE_LARGE)
507         self.set_encoding_parameters(25, 30, 50)
508         d = upload_data(self.u, data)
509         d.addCallback(extract_uri)
510         d.addCallback(self._check_large, SIZE_LARGE)
511         def _check(res):
512             for s in self.node.last_servers:
513                 allocated = s.allocated
514                 self.failUnlessEqual(len(allocated), 1)
515                 self.failUnlessEqual(s.queries, 1)
516         d.addCallback(_check)
517         return d
518
519     def test_two_each(self):
520         # if we have 100 shares, and there are 50 servers, and they all
521         # accept all shares, we should get exactly two shares per server
522
523         self.make_client()
524         data = self.get_data(SIZE_LARGE)
525         # if there are 50 servers, then happy needs to be <= 50
526         self.set_encoding_parameters(50, 50, 100)
527         d = upload_data(self.u, data)
528         d.addCallback(extract_uri)
529         d.addCallback(self._check_large, SIZE_LARGE)
530         def _check(res):
531             for s in self.node.last_servers:
532                 allocated = s.allocated
533                 self.failUnlessEqual(len(allocated), 2)
534                 self.failUnlessEqual(s.queries, 2)
535         d.addCallback(_check)
536         return d
537
538     def test_one_each_plus_one_extra(self):
539         # if we have 51 shares, and there are 50 servers, then one server
540         # gets two shares and the rest get just one
541
542         self.make_client()
543         data = self.get_data(SIZE_LARGE)
544         self.set_encoding_parameters(24, 41, 51)
545         d = upload_data(self.u, data)
546         d.addCallback(extract_uri)
547         d.addCallback(self._check_large, SIZE_LARGE)
548         def _check(res):
549             got_one = []
550             got_two = []
551             for s in self.node.last_servers:
552                 allocated = s.allocated
553                 self.failUnless(len(allocated) in (1,2), len(allocated))
554                 if len(allocated) == 1:
555                     self.failUnlessEqual(s.queries, 1)
556                     got_one.append(s)
557                 else:
558                     self.failUnlessEqual(s.queries, 2)
559                     got_two.append(s)
560             self.failUnlessEqual(len(got_one), 49)
561             self.failUnlessEqual(len(got_two), 1)
562         d.addCallback(_check)
563         return d
564
565     def test_four_each(self):
566         # if we have 200 shares, and there are 50 servers, then each server
567         # gets 4 shares. The design goal is to accomplish this with only two
568         # queries per server.
569
570         self.make_client()
571         data = self.get_data(SIZE_LARGE)
572         # if there are 50 servers, then happy should be no more than 50 if we
573         # want this to work.
574         self.set_encoding_parameters(100, 50, 200)
575         d = upload_data(self.u, data)
576         d.addCallback(extract_uri)
577         d.addCallback(self._check_large, SIZE_LARGE)
578         def _check(res):
579             for s in self.node.last_servers:
580                 allocated = s.allocated
581                 self.failUnlessEqual(len(allocated), 4)
582                 self.failUnlessEqual(s.queries, 2)
583         d.addCallback(_check)
584         return d
585
586     def test_three_of_ten(self):
587         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
588         # 4+4+2
589
590         self.make_client(3)
591         data = self.get_data(SIZE_LARGE)
592         self.set_encoding_parameters(3, 3, 10)
593         d = upload_data(self.u, data)
594         d.addCallback(extract_uri)
595         d.addCallback(self._check_large, SIZE_LARGE)
596         def _check(res):
597             counts = {}
598             for s in self.node.last_servers:
599                 allocated = s.allocated
600                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
601             histogram = [counts.get(i, 0) for i in range(5)]
602             self.failUnlessEqual(histogram, [0,0,0,2,1])
603         d.addCallback(_check)
604         return d
605
606     def test_some_big_some_small(self):
607         # 10 shares, 20 servers, but half the servers don't support a
608         # share-size large enough for our file
609         mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
610         self.node = FakeClient(mode, num_servers=20)
611         self.u = upload.Uploader()
612         self.u.running = True
613         self.u.parent = self.node
614
615         data = self.get_data(SIZE_LARGE)
616         self.set_encoding_parameters(3, 5, 10)
617         d = upload_data(self.u, data)
618         d.addCallback(extract_uri)
619         d.addCallback(self._check_large, SIZE_LARGE)
620         def _check(res):
621             # we should have put one share each on the big servers, and zero
622             # shares on the small servers
623             total_allocated = 0
624             for p in self.node.last_servers:
625                 if p.mode == "good":
626                     self.failUnlessEqual(len(p.allocated), 1)
627                 elif p.mode == "small":
628                     self.failUnlessEqual(len(p.allocated), 0)
629                 total_allocated += len(p.allocated)
630             self.failUnlessEqual(total_allocated, 10)
631         d.addCallback(_check)
632         return d
633
634
635 class StorageIndex(unittest.TestCase):
636     def test_params_must_matter(self):
637         DATA = "I am some data"
638         u = upload.Data(DATA, convergence="")
639         eu = upload.EncryptAnUploadable(u)
640         d1 = eu.get_storage_index()
641
642         # CHK means the same data should encrypt the same way
643         u = upload.Data(DATA, convergence="")
644         eu = upload.EncryptAnUploadable(u)
645         d1a = eu.get_storage_index()
646
647         # but if we use a different convergence string it should be different
648         u = upload.Data(DATA, convergence="wheee!")
649         eu = upload.EncryptAnUploadable(u)
650         d1salt1 = eu.get_storage_index()
651
652         # and if we add yet a different convergence it should be different again
653         u = upload.Data(DATA, convergence="NOT wheee!")
654         eu = upload.EncryptAnUploadable(u)
655         d1salt2 = eu.get_storage_index()
656
657         # and if we use the first string again it should be the same as last time
658         u = upload.Data(DATA, convergence="wheee!")
659         eu = upload.EncryptAnUploadable(u)
660         d1salt1a = eu.get_storage_index()
661
662         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
663         u = upload.Data(DATA, convergence="")
664         u.encoding_param_k = u.default_encoding_param_k + 1
665         eu = upload.EncryptAnUploadable(u)
666         d2 = eu.get_storage_index()
667
668         # and if we use a random key, it should be different than the CHK
669         u = upload.Data(DATA, convergence=None)
670         eu = upload.EncryptAnUploadable(u)
671         d3 = eu.get_storage_index()
672         # and different from another instance
673         u = upload.Data(DATA, convergence=None)
674         eu = upload.EncryptAnUploadable(u)
675         d4 = eu.get_storage_index()
676
677         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
678         def _done(res):
679             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
680             self.failUnlessEqual(si1, si1a)
681             self.failIfEqual(si1, si2)
682             self.failIfEqual(si1, si3)
683             self.failIfEqual(si1, si4)
684             self.failIfEqual(si3, si4)
685             self.failIfEqual(si1salt1, si1)
686             self.failIfEqual(si1salt1, si1salt2)
687             self.failIfEqual(si1salt2, si1)
688             self.failUnlessEqual(si1salt1, si1salt1a)
689         d.addCallback(_done)
690         return d
691
692 # copied from python docs because itertools.combinations was added in
693 # python 2.6 and we support >= 2.4.
694 def combinations(iterable, r):
695     # combinations('ABCD', 2) --> AB AC AD BC BD CD
696     # combinations(range(4), 3) --> 012 013 023 123
697     pool = tuple(iterable)
698     n = len(pool)
699     if r > n:
700         return
701     indices = range(r)
702     yield tuple(pool[i] for i in indices)
703     while True:
704         for i in reversed(range(r)):
705             if indices[i] != i + n - r:
706                 break
707         else:
708             return
709         indices[i] += 1
710         for j in range(i+1, r):
711             indices[j] = indices[j-1] + 1
712         yield tuple(pool[i] for i in indices)
713
714 def is_happy_enough(servertoshnums, h, k):
715     """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """
716     if len(servertoshnums) < h:
717         return False
718     # print "servertoshnums: ", servertoshnums, h, k
719     for happysetcombo in combinations(servertoshnums.iterkeys(), h):
720         # print "happysetcombo: ", happysetcombo
721         for subsetcombo in combinations(happysetcombo, k):
722             shnums = reduce(set.union, [ servertoshnums[s] for s in subsetcombo ])
723             # print "subsetcombo: ", subsetcombo, ", shnums: ", shnums
724             if len(shnums) < k:
725                 # print "NOT HAAPP{Y", shnums, k
726                 return False
727     # print "HAAPP{Y"
728     return True
729
730 class FakeServerTracker:
731     def __init__(self, serverid, buckets):
732         self._serverid = serverid
733         self.buckets = buckets
734     def get_serverid(self):
735         return self._serverid
736
737 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
738     ShouldFailMixin):
739     def find_all_shares(self, unused=None):
740         """Locate shares on disk. Returns a dict that maps
741         server to set of sharenums.
742         """
743         assert self.g, "I tried to find a grid at self.g, but failed"
744         servertoshnums = {} # k: server, v: set(shnum)
745
746         for i, c in self.g.servers_by_number.iteritems():
747             for (dirp, dirns, fns) in os.walk(c.sharedir):
748                 for fn in fns:
749                     try:
750                         sharenum = int(fn)
751                     except TypeError:
752                         # Whoops, I guess that's not a share file then.
753                         pass
754                     else:
755                         servertoshnums.setdefault(i, set()).add(sharenum)
756
757         return servertoshnums
758
759     def _do_upload_with_broken_servers(self, servers_to_break):
760         """
761         I act like a normal upload, but before I send the results of
762         Tahoe2ServerSelector to the Encoder, I break the first
763         servers_to_break ServerTrackers in the upload_servers part of the
764         return result.
765         """
766         assert self.g, "I tried to find a grid at self.g, but failed"
767         broker = self.g.clients[0].storage_broker
768         sh     = self.g.clients[0]._secret_holder
769         data = upload.Data("data" * 10000, convergence="")
770         data.encoding_param_k = 3
771         data.encoding_param_happy = 4
772         data.encoding_param_n = 10
773         uploadable = upload.EncryptAnUploadable(data)
774         encoder = encode.Encoder()
775         encoder.set_encrypted_uploadable(uploadable)
776         status = upload.UploadStatus()
777         selector = upload.Tahoe2ServerSelector("dglev", "test", status)
778         storage_index = encoder.get_param("storage_index")
779         share_size = encoder.get_param("share_size")
780         block_size = encoder.get_param("block_size")
781         num_segments = encoder.get_param("num_segments")
782         d = selector.get_shareholders(broker, sh, storage_index,
783                                       share_size, block_size, num_segments,
784                                       10, 3, 4)
785         def _have_shareholders((upload_trackers, already_servers)):
786             assert servers_to_break <= len(upload_trackers)
787             for index in xrange(servers_to_break):
788                 tracker = list(upload_trackers)[index]
789                 for share in tracker.buckets.keys():
790                     tracker.buckets[share].abort()
791             buckets = {}
792             servermap = already_servers.copy()
793             for tracker in upload_trackers:
794                 buckets.update(tracker.buckets)
795                 for bucket in tracker.buckets:
796                     servermap.setdefault(bucket, set()).add(tracker.get_serverid())
797             encoder.set_shareholders(buckets, servermap)
798             d = encoder.start()
799             return d
800         d.addCallback(_have_shareholders)
801         return d
802
803     def _has_happy_share_distribution(self):
804         servertoshnums = self.find_all_shares()
805         k = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['k']
806         h = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['happy']
807         return is_happy_enough(servertoshnums, h, k)
808
809     def _add_server(self, server_number, readonly=False):
810         assert self.g, "I tried to find a grid at self.g, but failed"
811         ss = self.g.make_server(server_number, readonly)
812         log.msg("just created a server, number: %s => %s" % (server_number, ss,))
813         self.g.add_server(server_number, ss)
814
815     def _add_server_with_share(self, server_number, share_number=None,
816                                readonly=False):
817         self._add_server(server_number, readonly)
818         if share_number is not None:
819             self._copy_share_to_server(share_number, server_number)
820
821
822     def _copy_share_to_server(self, share_number, server_number):
823         ss = self.g.servers_by_number[server_number]
824         # Copy share i from the directory associated with the first
825         # storage server to the directory associated with this one.
826         assert self.g, "I tried to find a grid at self.g, but failed"
827         assert self.shares, "I tried to find shares at self.shares, but failed"
828         old_share_location = self.shares[share_number][2]
829         new_share_location = os.path.join(ss.storedir, "shares")
830         si = uri.from_string(self.uri).get_storage_index()
831         new_share_location = os.path.join(new_share_location,
832                                           storage_index_to_dir(si))
833         if not os.path.exists(new_share_location):
834             os.makedirs(new_share_location)
835         new_share_location = os.path.join(new_share_location,
836                                           str(share_number))
837         if old_share_location != new_share_location:
838             shutil.copy(old_share_location, new_share_location)
839         shares = self.find_uri_shares(self.uri)
840         # Make sure that the storage server has the share.
841         self.failUnless((share_number, ss.my_nodeid, new_share_location)
842                         in shares)
843
844     def _setup_grid(self):
845         """
846         I set up a NoNetworkGrid with a single server and client.
847         """
848         self.set_up_grid(num_clients=1, num_servers=1)
849
850     def _setup_and_upload(self, **kwargs):
851         """
852         I set up a NoNetworkGrid with a single server and client,
853         upload a file to it, store its uri in self.uri, and store its
854         sharedata in self.shares.
855         """
856         self._setup_grid()
857         client = self.g.clients[0]
858         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
859         if "n" in kwargs and "k" in kwargs:
860             client.DEFAULT_ENCODING_PARAMETERS['k'] = kwargs['k']
861             client.DEFAULT_ENCODING_PARAMETERS['n'] = kwargs['n']
862         data = upload.Data("data" * 10000, convergence="")
863         self.data = data
864         d = client.upload(data)
865         def _store_uri(ur):
866             self.uri = ur.uri
867         d.addCallback(_store_uri)
868         d.addCallback(lambda ign:
869             self.find_uri_shares(self.uri))
870         def _store_shares(shares):
871             self.shares = shares
872         d.addCallback(_store_shares)
873         return d
874
875     def test_configure_parameters(self):
876         self.basedir = self.mktemp()
877         hooks = {0: self._set_up_nodes_extra_config}
878         self.set_up_grid(client_config_hooks=hooks)
879         c0 = self.g.clients[0]
880
881         DATA = "data" * 100
882         u = upload.Data(DATA, convergence="")
883         d = c0.upload(u)
884         d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
885         m = monitor.Monitor()
886         d.addCallback(lambda fn: fn.check(m))
887         def _check(cr):
888             data = cr.get_data()
889             self.failUnlessEqual(data["count-shares-needed"], 7)
890             self.failUnlessEqual(data["count-shares-expected"], 12)
891         d.addCallback(_check)
892         return d
893
894
895     def _setUp(self, ns):
896         # Used by test_happy_semantics and test_preexisting_share_behavior
897         # to set up the grid.
898         self.node = FakeClient(mode="good", num_servers=ns)
899         self.u = upload.Uploader()
900         self.u.running = True
901         self.u.parent = self.node
902
903
904     def test_happy_semantics(self):
905         self._setUp(2)
906         DATA = upload.Data("kittens" * 10000, convergence="")
907         # These parameters are unsatisfiable with only 2 servers.
908         self.set_encoding_parameters(k=3, happy=5, n=10)
909         d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
910                             "shares could be placed or found on only 2 "
911                             "server(s). We were asked to place shares on "
912                             "at least 5 server(s) such that any 3 of them "
913                             "have enough shares to recover the file",
914                             self.u.upload, DATA)
915         # Let's reset the client to have 10 servers
916         d.addCallback(lambda ign:
917             self._setUp(10))
918         # These parameters are satisfiable with 10 servers.
919         d.addCallback(lambda ign:
920             self.set_encoding_parameters(k=3, happy=5, n=10))
921         d.addCallback(lambda ign:
922             self.u.upload(DATA))
923         # Let's reset the client to have 7 servers
924         # (this is less than n, but more than h)
925         d.addCallback(lambda ign:
926             self._setUp(7))
927         # These parameters are satisfiable with 7 servers.
928         d.addCallback(lambda ign:
929             self.set_encoding_parameters(k=3, happy=5, n=10))
930         d.addCallback(lambda ign:
931             self.u.upload(DATA))
932         return d
933
934     def test_aborted_shares(self):
935         self.basedir = "upload/EncodingParameters/aborted_shares"
936         self.set_up_grid(num_servers=4)
937         c = self.g.clients[0]
938         DATA = upload.Data(100* "kittens", convergence="")
939         # These parameters are unsatisfiable with only 4 servers, but should
940         # work with 5, as long as the original 4 are not stuck in the open
941         # BucketWriter state (open() but not
942         parms = {"k":2, "happy":5, "n":5, "max_segment_size": 1*MiB}
943         c.DEFAULT_ENCODING_PARAMETERS = parms
944         d = self.shouldFail(UploadUnhappinessError, "test_aborted_shares",
945                             "shares could be placed on only 4 "
946                             "server(s) such that any 2 of them have enough "
947                             "shares to recover the file, but we were asked "
948                             "to place shares on at least 5 such servers",
949                             c.upload, DATA)
950         # now add the 5th server
951         d.addCallback(lambda ign: self._add_server(4, False))
952         # and this time the upload ought to succeed
953         d.addCallback(lambda ign: c.upload(DATA))
954         d.addCallback(lambda ign:
955             self.failUnless(self._has_happy_share_distribution()))
956         return d
957
958
959     def test_problem_layout_comment_52(self):
960         def _basedir():
961             self.basedir = self.mktemp()
962         _basedir()
963         # This scenario is at
964         # http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:52
965         #
966         # The scenario in comment:52 proposes that we have a layout
967         # like:
968         # server 0: shares 1 - 9
969         # server 1: share 0, read-only
970         # server 2: share 0, read-only
971         # server 3: share 0, read-only
972         # To get access to the shares, we will first upload to one
973         # server, which will then have shares 0 - 9. We'll then
974         # add three new servers, configure them to not accept any new
975         # shares, then write share 0 directly into the serverdir of each,
976         # and then remove share 0 from server 0 in the same way.
977         # Then each of servers 1 - 3 will report that they have share 0,
978         # and will not accept any new share, while server 0 will report that
979         # it has shares 1 - 9 and will accept new shares.
980         # We'll then set 'happy' = 4, and see that an upload fails
981         # (as it should)
982         d = self._setup_and_upload()
983         d.addCallback(lambda ign:
984             self._add_server_with_share(server_number=1, share_number=0,
985                                         readonly=True))
986         d.addCallback(lambda ign:
987             self._add_server_with_share(server_number=2, share_number=0,
988                                         readonly=True))
989         d.addCallback(lambda ign:
990             self._add_server_with_share(server_number=3, share_number=0,
991                                         readonly=True))
992         # Remove the first share from server 0.
993         def _remove_share_0_from_server_0():
994             share_location = self.shares[0][2]
995             os.remove(share_location)
996         d.addCallback(lambda ign:
997             _remove_share_0_from_server_0())
998         # Set happy = 4 in the client.
999         def _prepare():
1000             client = self.g.clients[0]
1001             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1002             return client
1003         d.addCallback(lambda ign:
1004             _prepare())
1005         # Uploading data should fail
1006         d.addCallback(lambda client:
1007             self.shouldFail(UploadUnhappinessError,
1008                             "test_problem_layout_comment_52_test_1",
1009                             "shares could be placed or found on 4 server(s), "
1010                             "but they are not spread out evenly enough to "
1011                             "ensure that any 3 of these servers would have "
1012                             "enough shares to recover the file. "
1013                             "We were asked to place shares on at "
1014                             "least 4 servers such that any 3 of them have "
1015                             "enough shares to recover the file",
1016                             client.upload, upload.Data("data" * 10000,
1017                                                        convergence="")))
1018
1019         # Do comment:52, but like this:
1020         # server 2: empty
1021         # server 3: share 0, read-only
1022         # server 1: share 0, read-only
1023         # server 0: shares 0-9
1024         d.addCallback(lambda ign:
1025             _basedir())
1026         d.addCallback(lambda ign:
1027             self._setup_and_upload())
1028         d.addCallback(lambda ign:
1029             self._add_server(server_number=2))
1030         d.addCallback(lambda ign:
1031             self._add_server_with_share(server_number=3, share_number=0,
1032                                         readonly=True))
1033         d.addCallback(lambda ign:
1034             self._add_server_with_share(server_number=1, share_number=0,
1035                                         readonly=True))
1036         def _prepare2():
1037             client = self.g.clients[0]
1038             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1039             return client
1040         d.addCallback(lambda ign:
1041             _prepare2())
1042         d.addCallback(lambda client:
1043             self.shouldFail(UploadUnhappinessError,
1044                             "test_problem_layout_comment_52_test_2",
1045                             "shares could be placed on only 3 server(s) such "
1046                             "that any 3 of them have enough shares to recover "
1047                             "the file, but we were asked to place shares on "
1048                             "at least 4 such servers.",
1049                             client.upload, upload.Data("data" * 10000,
1050                                                        convergence="")))
1051         return d
1052
1053
1054     def test_problem_layout_comment_53(self):
1055         # This scenario is at
1056         # http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:53
1057         #
1058         # Set up the grid to have one server
1059         def _change_basedir(ign):
1060             self.basedir = self.mktemp()
1061         _change_basedir(None)
1062         # We start by uploading all of the shares to one server.
1063         # Next, we'll add three new servers to our NoNetworkGrid. We'll add
1064         # one share from our initial upload to each of these.
1065         # The counterintuitive ordering of the share numbers is to deal with
1066         # the permuting of these servers -- distributing the shares this
1067         # way ensures that the Tahoe2ServerSelector sees them in the order
1068         # described below.
1069         d = self._setup_and_upload()
1070         d.addCallback(lambda ign:
1071             self._add_server_with_share(server_number=1, share_number=2))
1072         d.addCallback(lambda ign:
1073             self._add_server_with_share(server_number=2, share_number=0))
1074         d.addCallback(lambda ign:
1075             self._add_server_with_share(server_number=3, share_number=1))
1076         # So, we now have the following layout:
1077         # server 0: shares 0 - 9
1078         # server 1: share 2
1079         # server 2: share 0
1080         # server 3: share 1
1081         # We change the 'happy' parameter in the client to 4.
1082         # The Tahoe2ServerSelector will see the servers permuted as:
1083         # 2, 3, 1, 0
1084         # Ideally, a reupload of our original data should work.
1085         def _reset_encoding_parameters(ign, happy=4):
1086             client = self.g.clients[0]
1087             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1088             return client
1089         d.addCallback(_reset_encoding_parameters)
1090         d.addCallback(lambda client:
1091             client.upload(upload.Data("data" * 10000, convergence="")))
1092         d.addCallback(lambda ign:
1093             self.failUnless(self._has_happy_share_distribution()))
1094
1095
1096         # This scenario is basically comment:53, but changed so that the
1097         # Tahoe2ServerSelector sees the server with all of the shares before
1098         # any of the other servers.
1099         # The layout is:
1100         # server 2: shares 0 - 9
1101         # server 3: share 0
1102         # server 1: share 1
1103         # server 4: share 2
1104         # The Tahoe2ServerSelector sees the servers permuted as:
1105         # 2, 3, 1, 4
1106         # Note that server 0 has been replaced by server 4; this makes it
1107         # easier to ensure that the last server seen by Tahoe2ServerSelector
1108         # has only one share.
1109         d.addCallback(_change_basedir)
1110         d.addCallback(lambda ign:
1111             self._setup_and_upload())
1112         d.addCallback(lambda ign:
1113             self._add_server_with_share(server_number=2, share_number=0))
1114         d.addCallback(lambda ign:
1115             self._add_server_with_share(server_number=3, share_number=1))
1116         d.addCallback(lambda ign:
1117             self._add_server_with_share(server_number=1, share_number=2))
1118         # Copy all of the other shares to server number 2
1119         def _copy_shares(ign):
1120             for i in xrange(0, 10):
1121                 self._copy_share_to_server(i, 2)
1122         d.addCallback(_copy_shares)
1123         # Remove the first server, and add a placeholder with share 0
1124         d.addCallback(lambda ign:
1125             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1126         d.addCallback(lambda ign:
1127             self._add_server_with_share(server_number=4, share_number=0))
1128         # Now try uploading.
1129         d.addCallback(_reset_encoding_parameters)
1130         d.addCallback(lambda client:
1131             client.upload(upload.Data("data" * 10000, convergence="")))
1132         d.addCallback(lambda ign:
1133             self.failUnless(self._has_happy_share_distribution()))
1134
1135
1136         # Try the same thing, but with empty servers after the first one
1137         # We want to make sure that Tahoe2ServerSelector will redistribute
1138         # shares as necessary, not simply discover an existing layout.
1139         # The layout is:
1140         # server 2: shares 0 - 9
1141         # server 3: empty
1142         # server 1: empty
1143         # server 4: empty
1144         d.addCallback(_change_basedir)
1145         d.addCallback(lambda ign:
1146             self._setup_and_upload())
1147         d.addCallback(lambda ign:
1148             self._add_server(server_number=2))
1149         d.addCallback(lambda ign:
1150             self._add_server(server_number=3))
1151         d.addCallback(lambda ign:
1152             self._add_server(server_number=1))
1153         d.addCallback(lambda ign:
1154             self._add_server(server_number=4))
1155         d.addCallback(_copy_shares)
1156         d.addCallback(lambda ign:
1157             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1158         d.addCallback(_reset_encoding_parameters)
1159         d.addCallback(lambda client:
1160             client.upload(upload.Data("data" * 10000, convergence="")))
1161         # Make sure that only as many shares as necessary to satisfy
1162         # servers of happiness were pushed.
1163         d.addCallback(lambda results:
1164             self.failUnlessEqual(results.pushed_shares, 3))
1165         d.addCallback(lambda ign:
1166             self.failUnless(self._has_happy_share_distribution()))
1167         return d
1168
1169     def test_problem_layout_ticket_1124(self):
1170         self.basedir = self.mktemp()
1171         d = self._setup_and_upload(k=2, n=4)
1172
1173         # server 0: shares 0, 1, 2, 3
1174         # server 1: shares 0, 3
1175         # server 2: share 1
1176         # server 3: share 2
1177         # With this layout, an upload should just be satisfied that the current distribution is good enough, right?
1178         def _setup(ign):
1179             self._add_server_with_share(server_number=0, share_number=None)
1180             self._add_server_with_share(server_number=1, share_number=0)
1181             self._add_server_with_share(server_number=2, share_number=1)
1182             self._add_server_with_share(server_number=3, share_number=2)
1183             # Copy shares
1184             self._copy_share_to_server(3, 1)
1185             client = self.g.clients[0]
1186             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1187             return client
1188
1189         d.addCallback(_setup)
1190         d.addCallback(lambda client:
1191             client.upload(upload.Data("data" * 10000, convergence="")))
1192         d.addCallback(lambda ign:
1193             self.failUnless(self._has_happy_share_distribution()))
1194         return d
1195     test_problem_layout_ticket_1124.todo = "Fix this after 1.7.1 release."
1196
1197     def test_happiness_with_some_readonly_servers(self):
1198         # Try the following layout
1199         # server 2: shares 0-9
1200         # server 4: share 0, read-only
1201         # server 3: share 1, read-only
1202         # server 1: share 2, read-only
1203         self.basedir = self.mktemp()
1204         d = self._setup_and_upload()
1205         d.addCallback(lambda ign:
1206             self._add_server_with_share(server_number=2, share_number=0))
1207         d.addCallback(lambda ign:
1208             self._add_server_with_share(server_number=3, share_number=1,
1209                                         readonly=True))
1210         d.addCallback(lambda ign:
1211             self._add_server_with_share(server_number=1, share_number=2,
1212                                         readonly=True))
1213         # Copy all of the other shares to server number 2
1214         def _copy_shares(ign):
1215             for i in xrange(1, 10):
1216                 self._copy_share_to_server(i, 2)
1217         d.addCallback(_copy_shares)
1218         # Remove server 0, and add another in its place
1219         d.addCallback(lambda ign:
1220             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1221         d.addCallback(lambda ign:
1222             self._add_server_with_share(server_number=4, share_number=0,
1223                                         readonly=True))
1224         def _reset_encoding_parameters(ign, happy=4):
1225             client = self.g.clients[0]
1226             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1227             return client
1228         d.addCallback(_reset_encoding_parameters)
1229         d.addCallback(lambda client:
1230             client.upload(upload.Data("data" * 10000, convergence="")))
1231         d.addCallback(lambda ign:
1232             self.failUnless(self._has_happy_share_distribution()))
1233         return d
1234
1235
1236     def test_happiness_with_all_readonly_servers(self):
1237         # server 3: share 1, read-only
1238         # server 1: share 2, read-only
1239         # server 2: shares 0-9, read-only
1240         # server 4: share 0, read-only
1241         # The idea with this test is to make sure that the survey of
1242         # read-only servers doesn't undercount servers of happiness
1243         self.basedir = self.mktemp()
1244         d = self._setup_and_upload()
1245         d.addCallback(lambda ign:
1246             self._add_server_with_share(server_number=4, share_number=0,
1247                                         readonly=True))
1248         d.addCallback(lambda ign:
1249             self._add_server_with_share(server_number=3, share_number=1,
1250                                         readonly=True))
1251         d.addCallback(lambda ign:
1252             self._add_server_with_share(server_number=1, share_number=2,
1253                                         readonly=True))
1254         d.addCallback(lambda ign:
1255             self._add_server_with_share(server_number=2, share_number=0,
1256                                         readonly=True))
1257         def _copy_shares(ign):
1258             for i in xrange(1, 10):
1259                 self._copy_share_to_server(i, 2)
1260         d.addCallback(_copy_shares)
1261         d.addCallback(lambda ign:
1262             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1263         def _reset_encoding_parameters(ign, happy=4):
1264             client = self.g.clients[0]
1265             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1266             return client
1267         d.addCallback(_reset_encoding_parameters)
1268         d.addCallback(lambda client:
1269             client.upload(upload.Data("data" * 10000, convergence="")))
1270         d.addCallback(lambda ign:
1271             self.failUnless(self._has_happy_share_distribution()))
1272         return d
1273
1274
1275     def test_dropped_servers_in_encoder(self):
1276         # The Encoder does its own "servers_of_happiness" check if it
1277         # happens to lose a bucket during an upload (it assumes that
1278         # the layout presented to it satisfies "servers_of_happiness"
1279         # until a failure occurs)
1280         #
1281         # This test simulates an upload where servers break after server
1282         # selection, but before they are written to.
1283         def _set_basedir(ign=None):
1284             self.basedir = self.mktemp()
1285         _set_basedir()
1286         d = self._setup_and_upload();
1287         # Add 5 servers
1288         def _do_server_setup(ign):
1289             self._add_server(server_number=1)
1290             self._add_server(server_number=2)
1291             self._add_server(server_number=3)
1292             self._add_server(server_number=4)
1293             self._add_server(server_number=5)
1294         d.addCallback(_do_server_setup)
1295         # remove the original server
1296         # (necessary to ensure that the Tahoe2ServerSelector will distribute
1297         #  all the shares)
1298         def _remove_server(ign):
1299             server = self.g.servers_by_number[0]
1300             self.g.remove_server(server.my_nodeid)
1301         d.addCallback(_remove_server)
1302         # This should succeed; we still have 4 servers, and the
1303         # happiness of the upload is 4.
1304         d.addCallback(lambda ign:
1305             self._do_upload_with_broken_servers(1))
1306         # Now, do the same thing over again, but drop 2 servers instead
1307         # of 1. This should fail, because servers_of_happiness is 4 and
1308         # we can't satisfy that.
1309         d.addCallback(_set_basedir)
1310         d.addCallback(lambda ign:
1311             self._setup_and_upload())
1312         d.addCallback(_do_server_setup)
1313         d.addCallback(_remove_server)
1314         d.addCallback(lambda ign:
1315             self.shouldFail(UploadUnhappinessError,
1316                             "test_dropped_servers_in_encoder",
1317                             "shares could be placed on only 3 server(s) "
1318                             "such that any 3 of them have enough shares to "
1319                             "recover the file, but we were asked to place "
1320                             "shares on at least 4",
1321                             self._do_upload_with_broken_servers, 2))
1322         # Now do the same thing over again, but make some of the servers
1323         # readonly, break some of the ones that aren't, and make sure that
1324         # happiness accounting is preserved.
1325         d.addCallback(_set_basedir)
1326         d.addCallback(lambda ign:
1327             self._setup_and_upload())
1328         def _do_server_setup_2(ign):
1329             self._add_server(1)
1330             self._add_server(2)
1331             self._add_server(3)
1332             self._add_server_with_share(4, 7, readonly=True)
1333             self._add_server_with_share(5, 8, readonly=True)
1334         d.addCallback(_do_server_setup_2)
1335         d.addCallback(_remove_server)
1336         d.addCallback(lambda ign:
1337             self._do_upload_with_broken_servers(1))
1338         d.addCallback(_set_basedir)
1339         d.addCallback(lambda ign:
1340             self._setup_and_upload())
1341         d.addCallback(_do_server_setup_2)
1342         d.addCallback(_remove_server)
1343         d.addCallback(lambda ign:
1344             self.shouldFail(UploadUnhappinessError,
1345                             "test_dropped_servers_in_encoder",
1346                             "shares could be placed on only 3 server(s) "
1347                             "such that any 3 of them have enough shares to "
1348                             "recover the file, but we were asked to place "
1349                             "shares on at least 4",
1350                             self._do_upload_with_broken_servers, 2))
1351         return d
1352
1353
1354     def test_merge_servers(self):
1355         # merge_servers merges a list of upload_servers and a dict of
1356         # shareid -> serverid mappings.
1357         shares = {
1358                     1 : set(["server1"]),
1359                     2 : set(["server2"]),
1360                     3 : set(["server3"]),
1361                     4 : set(["server4", "server5"]),
1362                     5 : set(["server1", "server2"]),
1363                  }
1364         # if not provided with a upload_servers argument, it should just
1365         # return the first argument unchanged.
1366         self.failUnlessEqual(shares, merge_servers(shares, set([])))
1367         trackers = []
1368         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1369             t = FakeServerTracker(server, [i])
1370             trackers.append(t)
1371         expected = {
1372                     1 : set(["server1"]),
1373                     2 : set(["server2"]),
1374                     3 : set(["server3"]),
1375                     4 : set(["server4", "server5"]),
1376                     5 : set(["server1", "server2", "server5"]),
1377                     6 : set(["server6"]),
1378                     7 : set(["server7"]),
1379                     8 : set(["server8"]),
1380                    }
1381         self.failUnlessEqual(expected, merge_servers(shares, set(trackers)))
1382         shares2 = {}
1383         expected = {
1384                     5 : set(["server5"]),
1385                     6 : set(["server6"]),
1386                     7 : set(["server7"]),
1387                     8 : set(["server8"]),
1388                    }
1389         self.failUnlessEqual(expected, merge_servers(shares2, set(trackers)))
1390         shares3 = {}
1391         trackers = []
1392         expected = {}
1393         for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1394             shares3[i] = set([server])
1395             t = FakeServerTracker(server, [i])
1396             trackers.append(t)
1397             expected[i] = set([server])
1398         self.failUnlessEqual(expected, merge_servers(shares3, set(trackers)))
1399
1400
1401     def test_servers_of_happiness_utility_function(self):
1402         # These tests are concerned with the servers_of_happiness()
1403         # utility function, and its underlying matching algorithm. Other
1404         # aspects of the servers_of_happiness behavior are tested
1405         # elsehwere These tests exist to ensure that
1406         # servers_of_happiness doesn't under or overcount the happiness
1407         # value for given inputs.
1408
1409         # servers_of_happiness expects a dict of
1410         # shnum => set(serverids) as a preexisting shares argument.
1411         test1 = {
1412                  1 : set(["server1"]),
1413                  2 : set(["server2"]),
1414                  3 : set(["server3"]),
1415                  4 : set(["server4"])
1416                 }
1417         happy = servers_of_happiness(test1)
1418         self.failUnlessEqual(4, happy)
1419         test1[4] = set(["server1"])
1420         # We've added a duplicate server, so now servers_of_happiness
1421         # should be 3 instead of 4.
1422         happy = servers_of_happiness(test1)
1423         self.failUnlessEqual(3, happy)
1424         # The second argument of merge_servers should be a set of objects with
1425         # serverid and buckets as attributes. In actual use, these will be
1426         # ServerTracker instances, but for testing it is fine to make a
1427         # FakeServerTracker whose job is to hold those instance variables to
1428         # test that part.
1429         trackers = []
1430         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1431             t = FakeServerTracker(server, [i])
1432             trackers.append(t)
1433         # Recall that test1 is a server layout with servers_of_happiness
1434         # = 3.  Since there isn't any overlap between the shnum ->
1435         # set([serverid]) correspondences in test1 and those in trackers,
1436         # the result here should be 7.
1437         test2 = merge_servers(test1, set(trackers))
1438         happy = servers_of_happiness(test2)
1439         self.failUnlessEqual(7, happy)
1440         # Now add an overlapping server to trackers. This is redundant,
1441         # so it should not cause the previously reported happiness value
1442         # to change.
1443         t = FakeServerTracker("server1", [1])
1444         trackers.append(t)
1445         test2 = merge_servers(test1, set(trackers))
1446         happy = servers_of_happiness(test2)
1447         self.failUnlessEqual(7, happy)
1448         test = {}
1449         happy = servers_of_happiness(test)
1450         self.failUnlessEqual(0, happy)
1451         # Test a more substantial overlap between the trackers and the
1452         # existing assignments.
1453         test = {
1454             1 : set(['server1']),
1455             2 : set(['server2']),
1456             3 : set(['server3']),
1457             4 : set(['server4']),
1458         }
1459         trackers = []
1460         t = FakeServerTracker('server5', [4])
1461         trackers.append(t)
1462         t = FakeServerTracker('server6', [3, 5])
1463         trackers.append(t)
1464         # The value returned by servers_of_happiness is the size
1465         # of a maximum matching in the bipartite graph that
1466         # servers_of_happiness() makes between serverids and share
1467         # numbers. It should find something like this:
1468         # (server 1, share 1)
1469         # (server 2, share 2)
1470         # (server 3, share 3)
1471         # (server 5, share 4)
1472         # (server 6, share 5)
1473         #
1474         # and, since there are 5 edges in this matching, it should
1475         # return 5.
1476         test2 = merge_servers(test, set(trackers))
1477         happy = servers_of_happiness(test2)
1478         self.failUnlessEqual(5, happy)
1479         # Zooko's first puzzle:
1480         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1481         #
1482         # server 1: shares 0, 1
1483         # server 2: shares 1, 2
1484         # server 3: share 2
1485         #
1486         # This should yield happiness of 3.
1487         test = {
1488             0 : set(['server1']),
1489             1 : set(['server1', 'server2']),
1490             2 : set(['server2', 'server3']),
1491         }
1492         self.failUnlessEqual(3, servers_of_happiness(test))
1493         # Zooko's second puzzle:
1494         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1495         #
1496         # server 1: shares 0, 1
1497         # server 2: share 1
1498         #
1499         # This should yield happiness of 2.
1500         test = {
1501             0 : set(['server1']),
1502             1 : set(['server1', 'server2']),
1503         }
1504         self.failUnlessEqual(2, servers_of_happiness(test))
1505
1506
1507     def test_shares_by_server(self):
1508         test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1509         sbs = shares_by_server(test)
1510         self.failUnlessEqual(set([1]), sbs["server1"])
1511         self.failUnlessEqual(set([2]), sbs["server2"])
1512         self.failUnlessEqual(set([3]), sbs["server3"])
1513         self.failUnlessEqual(set([4]), sbs["server4"])
1514         test1 = {
1515                     1 : set(["server1"]),
1516                     2 : set(["server1"]),
1517                     3 : set(["server1"]),
1518                     4 : set(["server2"]),
1519                     5 : set(["server2"])
1520                 }
1521         sbs = shares_by_server(test1)
1522         self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1523         self.failUnlessEqual(set([4, 5]), sbs["server2"])
1524         # This should fail unless the serverid part of the mapping is a set
1525         test2 = {1: "server1"}
1526         self.shouldFail(AssertionError,
1527                        "test_shares_by_server",
1528                        "",
1529                        shares_by_server, test2)
1530
1531
1532     def test_existing_share_detection(self):
1533         self.basedir = self.mktemp()
1534         d = self._setup_and_upload()
1535         # Our final setup should look like this:
1536         # server 1: shares 0 - 9, read-only
1537         # server 2: empty
1538         # server 3: empty
1539         # server 4: empty
1540         # The purpose of this test is to make sure that the server selector
1541         # knows about the shares on server 1, even though it is read-only.
1542         # It used to simply filter these out, which would cause the test
1543         # to fail when servers_of_happiness = 4.
1544         d.addCallback(lambda ign:
1545             self._add_server_with_share(1, 0, True))
1546         d.addCallback(lambda ign:
1547             self._add_server(2))
1548         d.addCallback(lambda ign:
1549             self._add_server(3))
1550         d.addCallback(lambda ign:
1551             self._add_server(4))
1552         def _copy_shares(ign):
1553             for i in xrange(1, 10):
1554                 self._copy_share_to_server(i, 1)
1555         d.addCallback(_copy_shares)
1556         d.addCallback(lambda ign:
1557             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1558         def _prepare_client(ign):
1559             client = self.g.clients[0]
1560             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1561             return client
1562         d.addCallback(_prepare_client)
1563         d.addCallback(lambda client:
1564             client.upload(upload.Data("data" * 10000, convergence="")))
1565         d.addCallback(lambda ign:
1566             self.failUnless(self._has_happy_share_distribution()))
1567         return d
1568
1569
1570     def test_query_counting(self):
1571         # If server selection fails, Tahoe2ServerSelector prints out a lot
1572         # of helpful diagnostic information, including query stats.
1573         # This test helps make sure that that information is accurate.
1574         self.basedir = self.mktemp()
1575         d = self._setup_and_upload()
1576         def _setup(ign):
1577             for i in xrange(1, 11):
1578                 self._add_server(server_number=i)
1579             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1580             c = self.g.clients[0]
1581             # We set happy to an unsatisfiable value so that we can check the
1582             # counting in the exception message. The same progress message
1583             # is also used when the upload is successful, but in that case it
1584             # only gets written to a log, so we can't see what it says.
1585             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1586             return c
1587         d.addCallback(_setup)
1588         d.addCallback(lambda c:
1589             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1590                             "10 queries placed some shares",
1591                             c.upload, upload.Data("data" * 10000,
1592                                                   convergence="")))
1593         # Now try with some readonly servers. We want to make sure that
1594         # the readonly server share discovery phase is counted correctly.
1595         def _reset(ign):
1596             self.basedir = self.mktemp()
1597             self.g = None
1598         d.addCallback(_reset)
1599         d.addCallback(lambda ign:
1600             self._setup_and_upload())
1601         def _then(ign):
1602             for i in xrange(1, 11):
1603                 self._add_server(server_number=i)
1604             self._add_server(server_number=11, readonly=True)
1605             self._add_server(server_number=12, readonly=True)
1606             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1607             c = self.g.clients[0]
1608             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1609             return c
1610         d.addCallback(_then)
1611         d.addCallback(lambda c:
1612             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1613                             "2 placed none (of which 2 placed none due to "
1614                             "the server being full",
1615                             c.upload, upload.Data("data" * 10000,
1616                                                   convergence="")))
1617         # Now try the case where the upload process finds a bunch of the
1618         # shares that it wants to place on the first server, including
1619         # the one that it wanted to allocate there. Though no shares will
1620         # be allocated in this request, it should still be called
1621         # productive, since it caused some homeless shares to be
1622         # removed.
1623         d.addCallback(_reset)
1624         d.addCallback(lambda ign:
1625             self._setup_and_upload())
1626
1627         def _next(ign):
1628             for i in xrange(1, 11):
1629                 self._add_server(server_number=i)
1630             # Copy all of the shares to server 9, since that will be
1631             # the first one that the selector sees.
1632             for i in xrange(10):
1633                 self._copy_share_to_server(i, 9)
1634             # Remove server 0, and its contents
1635             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1636             # Make happiness unsatisfiable
1637             c = self.g.clients[0]
1638             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1639             return c
1640         d.addCallback(_next)
1641         d.addCallback(lambda c:
1642             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1643                             "1 queries placed some shares",
1644                             c.upload, upload.Data("data" * 10000,
1645                                                   convergence="")))
1646         return d
1647
1648
1649     def test_upper_limit_on_readonly_queries(self):
1650         self.basedir = self.mktemp()
1651         d = self._setup_and_upload()
1652         def _then(ign):
1653             for i in xrange(1, 11):
1654                 self._add_server(server_number=i, readonly=True)
1655             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1656             c = self.g.clients[0]
1657             c.DEFAULT_ENCODING_PARAMETERS['k'] = 2
1658             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1659             c.DEFAULT_ENCODING_PARAMETERS['n'] = 4
1660             return c
1661         d.addCallback(_then)
1662         d.addCallback(lambda client:
1663             self.shouldFail(UploadUnhappinessError,
1664                             "test_upper_limit_on_readonly_queries",
1665                             "sent 8 queries to 8 servers",
1666                             client.upload,
1667                             upload.Data('data' * 10000, convergence="")))
1668         return d
1669
1670
1671     def test_exception_messages_during_server_selection(self):
1672         # server 1: read-only, no shares
1673         # server 2: read-only, no shares
1674         # server 3: read-only, no shares
1675         # server 4: read-only, no shares
1676         # server 5: read-only, no shares
1677         # This will fail, but we want to make sure that the log messages
1678         # are informative about why it has failed.
1679         self.basedir = self.mktemp()
1680         d = self._setup_and_upload()
1681         d.addCallback(lambda ign:
1682             self._add_server(server_number=1, readonly=True))
1683         d.addCallback(lambda ign:
1684             self._add_server(server_number=2, readonly=True))
1685         d.addCallback(lambda ign:
1686             self._add_server(server_number=3, readonly=True))
1687         d.addCallback(lambda ign:
1688             self._add_server(server_number=4, readonly=True))
1689         d.addCallback(lambda ign:
1690             self._add_server(server_number=5, readonly=True))
1691         d.addCallback(lambda ign:
1692             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1693         def _reset_encoding_parameters(ign, happy=4):
1694             client = self.g.clients[0]
1695             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1696             return client
1697         d.addCallback(_reset_encoding_parameters)
1698         d.addCallback(lambda client:
1699             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1700                             "placed 0 shares out of 10 "
1701                             "total (10 homeless), want to place shares on at "
1702                             "least 4 servers such that any 3 of them have "
1703                             "enough shares to recover the file, "
1704                             "sent 5 queries to 5 servers, 0 queries placed "
1705                             "some shares, 5 placed none "
1706                             "(of which 5 placed none due to the server being "
1707                             "full and 0 placed none due to an error)",
1708                             client.upload,
1709                             upload.Data("data" * 10000, convergence="")))
1710
1711
1712         # server 1: read-only, no shares
1713         # server 2: broken, no shares
1714         # server 3: read-only, no shares
1715         # server 4: read-only, no shares
1716         # server 5: read-only, no shares
1717         def _reset(ign):
1718             self.basedir = self.mktemp()
1719         d.addCallback(_reset)
1720         d.addCallback(lambda ign:
1721             self._setup_and_upload())
1722         d.addCallback(lambda ign:
1723             self._add_server(server_number=1, readonly=True))
1724         d.addCallback(lambda ign:
1725             self._add_server(server_number=2))
1726         def _break_server_2(ign):
1727             serverid = self.g.servers_by_number[2].my_nodeid
1728             self.g.break_server(serverid)
1729         d.addCallback(_break_server_2)
1730         d.addCallback(lambda ign:
1731             self._add_server(server_number=3, readonly=True))
1732         d.addCallback(lambda ign:
1733             self._add_server(server_number=4, readonly=True))
1734         d.addCallback(lambda ign:
1735             self._add_server(server_number=5, readonly=True))
1736         d.addCallback(lambda ign:
1737             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1738         d.addCallback(_reset_encoding_parameters)
1739         d.addCallback(lambda client:
1740             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1741                             "placed 0 shares out of 10 "
1742                             "total (10 homeless), want to place shares on at "
1743                             "least 4 servers such that any 3 of them have "
1744                             "enough shares to recover the file, "
1745                             "sent 5 queries to 5 servers, 0 queries placed "
1746                             "some shares, 5 placed none "
1747                             "(of which 4 placed none due to the server being "
1748                             "full and 1 placed none due to an error)",
1749                             client.upload,
1750                             upload.Data("data" * 10000, convergence="")))
1751         # server 0, server 1 = empty, accepting shares
1752         # This should place all of the shares, but still fail with happy=4.
1753         # We want to make sure that the exception message is worded correctly.
1754         d.addCallback(_reset)
1755         d.addCallback(lambda ign:
1756             self._setup_grid())
1757         d.addCallback(lambda ign:
1758             self._add_server(server_number=1))
1759         d.addCallback(_reset_encoding_parameters)
1760         d.addCallback(lambda client:
1761             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1762                             "shares could be placed or found on only 2 "
1763                             "server(s). We were asked to place shares on at "
1764                             "least 4 server(s) such that any 3 of them have "
1765                             "enough shares to recover the file.",
1766                             client.upload, upload.Data("data" * 10000,
1767                                                        convergence="")))
1768         # servers 0 - 4 = empty, accepting shares
1769         # This too should place all the shares, and this too should fail,
1770         # but since the effective happiness is more than the k encoding
1771         # parameter, it should trigger a different error message than the one
1772         # above.
1773         d.addCallback(_reset)
1774         d.addCallback(lambda ign:
1775             self._setup_grid())
1776         d.addCallback(lambda ign:
1777             self._add_server(server_number=1))
1778         d.addCallback(lambda ign:
1779             self._add_server(server_number=2))
1780         d.addCallback(lambda ign:
1781             self._add_server(server_number=3))
1782         d.addCallback(lambda ign:
1783             self._add_server(server_number=4))
1784         d.addCallback(_reset_encoding_parameters, happy=7)
1785         d.addCallback(lambda client:
1786             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1787                             "shares could be placed on only 5 server(s) such "
1788                             "that any 3 of them have enough shares to recover "
1789                             "the file, but we were asked to place shares on "
1790                             "at least 7 such servers.",
1791                             client.upload, upload.Data("data" * 10000,
1792                                                        convergence="")))
1793         # server 0: shares 0 - 9
1794         # server 1: share 0, read-only
1795         # server 2: share 0, read-only
1796         # server 3: share 0, read-only
1797         # This should place all of the shares, but fail with happy=4.
1798         # Since the number of servers with shares is more than the number
1799         # necessary to reconstitute the file, this will trigger a different
1800         # error message than either of those above.
1801         d.addCallback(_reset)
1802         d.addCallback(lambda ign:
1803             self._setup_and_upload())
1804         d.addCallback(lambda ign:
1805             self._add_server_with_share(server_number=1, share_number=0,
1806                                         readonly=True))
1807         d.addCallback(lambda ign:
1808             self._add_server_with_share(server_number=2, share_number=0,
1809                                         readonly=True))
1810         d.addCallback(lambda ign:
1811             self._add_server_with_share(server_number=3, share_number=0,
1812                                         readonly=True))
1813         d.addCallback(_reset_encoding_parameters, happy=7)
1814         d.addCallback(lambda client:
1815             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1816                             "shares could be placed or found on 4 server(s), "
1817                             "but they are not spread out evenly enough to "
1818                             "ensure that any 3 of these servers would have "
1819                             "enough shares to recover the file. We were asked "
1820                             "to place shares on at least 7 servers such that "
1821                             "any 3 of them have enough shares to recover the "
1822                             "file",
1823                             client.upload, upload.Data("data" * 10000,
1824                                                        convergence="")))
1825         return d
1826
1827
1828     def test_problem_layout_comment_187(self):
1829         # #778 comment 187 broke an initial attempt at a share
1830         # redistribution algorithm. This test is here to demonstrate the
1831         # breakage, and to test that subsequent algorithms don't also
1832         # break in the same way.
1833         self.basedir = self.mktemp()
1834         d = self._setup_and_upload(k=2, n=3)
1835
1836         # server 1: shares 0, 1, 2, readonly
1837         # server 2: share 0, readonly
1838         # server 3: share 0
1839         def _setup(ign):
1840             self._add_server_with_share(server_number=1, share_number=0,
1841                                         readonly=True)
1842             self._add_server_with_share(server_number=2, share_number=0,
1843                                         readonly=True)
1844             self._add_server_with_share(server_number=3, share_number=0)
1845             # Copy shares
1846             self._copy_share_to_server(1, 1)
1847             self._copy_share_to_server(2, 1)
1848             # Remove server 0
1849             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1850             client = self.g.clients[0]
1851             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
1852             return client
1853
1854         d.addCallback(_setup)
1855         d.addCallback(lambda client:
1856             client.upload(upload.Data("data" * 10000, convergence="")))
1857         d.addCallback(lambda ign:
1858             self.failUnless(self._has_happy_share_distribution()))
1859         return d
1860     test_problem_layout_comment_187.todo = "this isn't fixed yet"
1861
1862     def test_problem_layout_ticket_1118(self):
1863         # #1118 includes a report from a user who hit an assertion in
1864         # the upload code with this layout.
1865         self.basedir = self.mktemp()
1866         d = self._setup_and_upload(k=2, n=4)
1867
1868         # server 0: no shares
1869         # server 1: shares 0, 3
1870         # server 3: share 1
1871         # server 2: share 2
1872         # The order that they get queries is 0, 1, 3, 2
1873         def _setup(ign):
1874             self._add_server(server_number=0)
1875             self._add_server_with_share(server_number=1, share_number=0)
1876             self._add_server_with_share(server_number=2, share_number=2)
1877             self._add_server_with_share(server_number=3, share_number=1)
1878             # Copy shares
1879             self._copy_share_to_server(3, 1)
1880             storedir = self.get_serverdir(0)
1881             # remove the storedir, wiping out any existing shares
1882             shutil.rmtree(storedir)
1883             # create an empty storedir to replace the one we just removed
1884             os.mkdir(storedir)
1885             client = self.g.clients[0]
1886             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1887             return client
1888
1889         d.addCallback(_setup)
1890         # Note: actually it should succeed! See
1891         # test_problem_layout_ticket_1128. But ticket 1118 is just to
1892         # make it realize that it has failed, so if it raises
1893         # UploadUnhappinessError then we'll give it the green light
1894         # for now.
1895         d.addCallback(lambda ignored:
1896             self.shouldFail(UploadUnhappinessError,
1897                             "test_problem_layout_ticket_1118",
1898                             "",
1899                             self.g.clients[0].upload, upload.Data("data" * 10000,
1900                                                        convergence="")))
1901         return d
1902
1903     def test_problem_layout_ticket_1128(self):
1904         # #1118 includes a report from a user who hit an assertion in
1905         # the upload code with this layout.
1906         self.basedir = self.mktemp()
1907         d = self._setup_and_upload(k=2, n=4)
1908
1909         # server 0: no shares
1910         # server 1: shares 0, 3
1911         # server 3: share 1
1912         # server 2: share 2
1913         # The order that they get queries is 0, 1, 3, 2
1914         def _setup(ign):
1915             self._add_server(server_number=0)
1916             self._add_server_with_share(server_number=1, share_number=0)
1917             self._add_server_with_share(server_number=2, share_number=2)
1918             self._add_server_with_share(server_number=3, share_number=1)
1919             # Copy shares
1920             self._copy_share_to_server(3, 1)
1921             storedir = self.get_serverdir(0)
1922             # remove the storedir, wiping out any existing shares
1923             shutil.rmtree(storedir)
1924             # create an empty storedir to replace the one we just removed
1925             os.mkdir(storedir)
1926             client = self.g.clients[0]
1927             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1928             return client
1929
1930         d.addCallback(_setup)
1931         d.addCallback(lambda client:
1932                           client.upload(upload.Data("data" * 10000, convergence="")))
1933         d.addCallback(lambda ign:
1934             self.failUnless(self._has_happy_share_distribution()))
1935         return d
1936     test_problem_layout_ticket_1128.todo = "Invent a smarter uploader that uploads successfully in this case."
1937
1938     def test_upload_succeeds_with_some_homeless_shares(self):
1939         # If the upload is forced to stop trying to place shares before
1940         # it has placed (or otherwise accounted) for all of them, but it
1941         # has placed enough to satisfy the upload health criteria that
1942         # we're using, it should still succeed.
1943         self.basedir = self.mktemp()
1944         d = self._setup_and_upload()
1945         def _server_setup(ign):
1946             # Add four servers so that we have a layout like this:
1947             # server 1: share 0, read-only
1948             # server 2: share 1, read-only
1949             # server 3: share 2, read-only
1950             # server 4: share 3, read-only
1951             # If we set happy = 4, the upload will manage to satisfy
1952             # servers of happiness, but not place all of the shares; we
1953             # want to test that the upload is declared successful in
1954             # this case.
1955             self._add_server_with_share(server_number=1, share_number=0,
1956                                         readonly=True)
1957             self._add_server_with_share(server_number=2, share_number=1,
1958                                         readonly=True)
1959             self._add_server_with_share(server_number=3, share_number=2,
1960                                         readonly=True)
1961             self._add_server_with_share(server_number=4, share_number=3,
1962                                         readonly=True)
1963             # Remove server 0.
1964             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1965             # Set the client appropriately
1966             c = self.g.clients[0]
1967             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1968             return c
1969         d.addCallback(_server_setup)
1970         d.addCallback(lambda client:
1971             client.upload(upload.Data("data" * 10000, convergence="")))
1972         d.addCallback(lambda ign:
1973             self.failUnless(self._has_happy_share_distribution()))
1974         return d
1975
1976
1977     def test_uploader_skips_over_servers_with_only_one_share(self):
1978         # We want to make sure that the redistribution logic ignores
1979         # servers with only one share, since placing these shares
1980         # elsewhere will at best keep happiness the same as it was, and
1981         # at worst hurt it.
1982         self.basedir = self.mktemp()
1983         d = self._setup_and_upload()
1984         def _server_setup(ign):
1985             # Add some servers so that the upload will need to
1986             # redistribute, but will first pass over a couple of servers
1987             # that don't have enough shares to redistribute before
1988             # finding one that does have shares to redistribute.
1989             self._add_server_with_share(server_number=1, share_number=0)
1990             self._add_server_with_share(server_number=2, share_number=2)
1991             self._add_server_with_share(server_number=3, share_number=1)
1992             self._add_server_with_share(server_number=8, share_number=4)
1993             self._add_server_with_share(server_number=5, share_number=5)
1994             self._add_server_with_share(server_number=10, share_number=7)
1995             for i in xrange(4):
1996                 self._copy_share_to_server(i, 2)
1997             return self.g.clients[0]
1998         d.addCallback(_server_setup)
1999         d.addCallback(lambda client:
2000             client.upload(upload.Data("data" * 10000, convergence="")))
2001         d.addCallback(lambda ign:
2002             self.failUnless(self._has_happy_share_distribution()))
2003         return d
2004
2005
2006     def test_server_selector_bucket_abort(self):
2007         # If server selection for an upload fails due to an unhappy
2008         # layout, the server selection process should abort the buckets it
2009         # allocates before failing, so that the space can be re-used.
2010         self.basedir = self.mktemp()
2011         self.set_up_grid(num_servers=5)
2012
2013         # Try to upload a file with happy=7, which is unsatisfiable with
2014         # the current grid. This will fail, but should not take up any
2015         # space on the storage servers after it fails.
2016         client = self.g.clients[0]
2017         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2018         d = defer.succeed(None)
2019         d.addCallback(lambda ignored:
2020             self.shouldFail(UploadUnhappinessError,
2021                             "test_server_selection_bucket_abort",
2022                             "",
2023                             client.upload, upload.Data("data" * 10000,
2024                                                        convergence="")))
2025         # wait for the abort messages to get there.
2026         def _turn_barrier(res):
2027             return fireEventually(res)
2028         d.addCallback(_turn_barrier)
2029         def _then(ignored):
2030             for server in self.g.servers_by_number.values():
2031                 self.failUnlessEqual(server.allocated_size(), 0)
2032         d.addCallback(_then)
2033         return d
2034
2035
2036     def test_encoder_bucket_abort(self):
2037         # If enough servers die in the process of encoding and uploading
2038         # a file to make the layout unhappy, we should cancel the
2039         # newly-allocated buckets before dying.
2040         self.basedir = self.mktemp()
2041         self.set_up_grid(num_servers=4)
2042
2043         client = self.g.clients[0]
2044         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2045
2046         d = defer.succeed(None)
2047         d.addCallback(lambda ignored:
2048             self.shouldFail(UploadUnhappinessError,
2049                             "test_encoder_bucket_abort",
2050                             "",
2051                             self._do_upload_with_broken_servers, 1))
2052         def _turn_barrier(res):
2053             return fireEventually(res)
2054         d.addCallback(_turn_barrier)
2055         def _then(ignored):
2056             for server in self.g.servers_by_number.values():
2057                 self.failUnlessEqual(server.allocated_size(), 0)
2058         d.addCallback(_then)
2059         return d
2060
2061
2062     def _set_up_nodes_extra_config(self, clientdir):
2063         cfgfn = os.path.join(clientdir, "tahoe.cfg")
2064         oldcfg = open(cfgfn, "r").read()
2065         f = open(cfgfn, "wt")
2066         f.write(oldcfg)
2067         f.write("\n")
2068         f.write("[client]\n")
2069         f.write("shares.needed = 7\n")
2070         f.write("shares.total = 12\n")
2071         f.write("\n")
2072         f.close()
2073         return None
2074
2075 # TODO:
2076 #  upload with exactly 75 servers (shares_of_happiness)
2077 #  have a download fail
2078 #  cancel a download (need to implement more cancel stuff)
2079
2080 # from test_encode:
2081 # NoNetworkGrid, upload part of ciphertext, kill server, continue upload
2082 # check with Kevan, they want to live in test_upload, existing tests might cover
2083 #     def test_lost_one_shareholder(self): # these are upload-side tests
2084 #     def test_lost_one_shareholder_early(self):
2085 #     def test_lost_many_shareholders(self):
2086 #     def test_lost_all_shareholders(self):