]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
20524ef06ea6e7ddd24dcc6c07f792f53354df15
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1 # -*- coding: utf-8 -*-
2
3 import os, shutil
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.python.failure import Failure
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
9
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
14 from allmydata.util import log, base32
15 from allmydata.util.assertutil import precondition
16 from allmydata.util.deferredutil import DeferredListShouldSucceed
17 from allmydata.test.no_network import GridTestMixin
18 from allmydata.test.common_util import ShouldFailMixin
19 from allmydata.util.happinessutil import servers_of_happiness, \
20                                          shares_by_server, merge_servers
21 from allmydata.storage_client import StorageFarmBroker
22 from allmydata.storage.server import storage_index_to_dir
23 from allmydata.client import Client
24
25 MiB = 1024*1024
26
27 def extract_uri(results):
28     return results.get_uri()
29
30 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
31 # have been due to an earlier test ERROR'ing out due to timeout, which seems
32 # to screw up subsequent tests.
33 timeout = 960
34
35 class Uploadable(unittest.TestCase):
36     def shouldEqual(self, data, expected):
37         self.failUnless(isinstance(data, list))
38         for e in data:
39             self.failUnless(isinstance(e, str))
40         s = "".join(data)
41         self.failUnlessEqual(s, expected)
42
43     def test_filehandle_random_key(self):
44         return self._test_filehandle(convergence=None)
45
46     def test_filehandle_convergent_encryption(self):
47         return self._test_filehandle(convergence="some convergence string")
48
49     def _test_filehandle(self, convergence):
50         s = StringIO("a"*41)
51         u = upload.FileHandle(s, convergence=convergence)
52         d = u.get_size()
53         d.addCallback(self.failUnlessEqual, 41)
54         d.addCallback(lambda res: u.read(1))
55         d.addCallback(self.shouldEqual, "a")
56         d.addCallback(lambda res: u.read(80))
57         d.addCallback(self.shouldEqual, "a"*40)
58         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
59         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
60         return d
61
62     def test_filename(self):
63         basedir = "upload/Uploadable/test_filename"
64         os.makedirs(basedir)
65         fn = os.path.join(basedir, "file")
66         f = open(fn, "w")
67         f.write("a"*41)
68         f.close()
69         u = upload.FileName(fn, convergence=None)
70         d = u.get_size()
71         d.addCallback(self.failUnlessEqual, 41)
72         d.addCallback(lambda res: u.read(1))
73         d.addCallback(self.shouldEqual, "a")
74         d.addCallback(lambda res: u.read(80))
75         d.addCallback(self.shouldEqual, "a"*40)
76         d.addCallback(lambda res: u.close())
77         return d
78
79     def test_data(self):
80         s = "a"*41
81         u = upload.Data(s, convergence=None)
82         d = u.get_size()
83         d.addCallback(self.failUnlessEqual, 41)
84         d.addCallback(lambda res: u.read(1))
85         d.addCallback(self.shouldEqual, "a")
86         d.addCallback(lambda res: u.read(80))
87         d.addCallback(self.shouldEqual, "a"*40)
88         d.addCallback(lambda res: u.close())
89         return d
90
91 class ServerError(Exception):
92     pass
93
94 class SetDEPMixin:
95     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
96         p = {"k": k,
97              "happy": happy,
98              "n": n,
99              "max_segment_size": max_segsize,
100              }
101         self.node.DEFAULT_ENCODING_PARAMETERS = p
102
103 class FakeStorageServer:
104     def __init__(self, mode):
105         self.mode = mode
106         self.allocated = []
107         self.queries = 0
108         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
109                          { "maximum-immutable-share-size": 2**32 },
110                          "application-version": str(allmydata.__full_version__),
111                          }
112         if mode == "small":
113             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
114                              { "maximum-immutable-share-size": 10 },
115                              "application-version": str(allmydata.__full_version__),
116                              }
117
118
119     def callRemote(self, methname, *args, **kwargs):
120         def _call():
121             meth = getattr(self, methname)
122             return meth(*args, **kwargs)
123         d = fireEventually()
124         d.addCallback(lambda res: _call())
125         return d
126
127     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
128                          sharenums, share_size, canary):
129         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
130         if self.mode == "first-fail":
131             if self.queries == 0:
132                 raise ServerError
133         if self.mode == "second-fail":
134             if self.queries == 1:
135                 raise ServerError
136         self.queries += 1
137         if self.mode == "full":
138             return (set(), {},)
139         elif self.mode == "already got them":
140             return (set(sharenums), {},)
141         else:
142             for shnum in sharenums:
143                 self.allocated.append( (storage_index, shnum) )
144             return (set(),
145                     dict([( shnum, FakeBucketWriter(share_size) )
146                           for shnum in sharenums]),
147                     )
148
149 class FakeBucketWriter:
150     # a diagnostic version of storageserver.BucketWriter
151     def __init__(self, size):
152         self.data = StringIO()
153         self.closed = False
154         self._size = size
155
156     def callRemote(self, methname, *args, **kwargs):
157         def _call():
158             meth = getattr(self, "remote_" + methname)
159             return meth(*args, **kwargs)
160         d = fireEventually()
161         d.addCallback(lambda res: _call())
162         return d
163
164
165     def callRemoteOnly(self, methname, *args, **kwargs):
166         d = self.callRemote(methname, *args, **kwargs)
167         del d # callRemoteOnly ignores this
168         return None
169
170
171     def remote_write(self, offset, data):
172         precondition(not self.closed)
173         precondition(offset >= 0)
174         precondition(offset+len(data) <= self._size,
175                      "offset=%d + data=%d > size=%d" %
176                      (offset, len(data), self._size))
177         self.data.seek(offset)
178         self.data.write(data)
179
180     def remote_close(self):
181         precondition(not self.closed)
182         self.closed = True
183
184     def remote_abort(self):
185         pass
186
187 class FakeClient:
188     DEFAULT_ENCODING_PARAMETERS = {"k":25,
189                                    "happy": 25,
190                                    "n": 100,
191                                    "max_segment_size": 1*MiB,
192                                    }
193     def __init__(self, mode="good", num_servers=50):
194         self.num_servers = num_servers
195         if type(mode) is str:
196             mode = dict([i,mode] for i in range(num_servers))
197         servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
198                     for fakeid in range(self.num_servers) ]
199         self.storage_broker = StorageFarmBroker(None, permute_peers=True)
200         for (serverid, rref) in servers:
201             ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
202                    "permutation-seed-base32": base32.b2a(serverid) }
203             self.storage_broker.test_add_rref(serverid, rref, ann)
204         self.last_servers = [s[1] for s in servers]
205
206     def log(self, *args, **kwargs):
207         pass
208     def get_encoding_parameters(self):
209         return self.DEFAULT_ENCODING_PARAMETERS
210     def get_storage_broker(self):
211         return self.storage_broker
212     _secret_holder = client.SecretHolder("lease secret", "convergence secret")
213
214 class GotTooFarError(Exception):
215     pass
216
217 class GiganticUploadable(upload.FileHandle):
218     def __init__(self, size):
219         self._size = size
220         self._fp = 0
221
222     def get_encryption_key(self):
223         return defer.succeed("\x00" * 16)
224     def get_size(self):
225         return defer.succeed(self._size)
226     def read(self, length):
227         left = self._size - self._fp
228         length = min(left, length)
229         self._fp += length
230         if self._fp > 1000000:
231             # terminate the test early.
232             raise GotTooFarError("we shouldn't be allowed to get this far")
233         return defer.succeed(["\x00" * length])
234     def close(self):
235         pass
236
237 DATA = """
238 Once upon a time, there was a beautiful princess named Buttercup. She lived
239 in a magical land where every file was stored securely among millions of
240 machines, and nobody ever worried about their data being lost ever again.
241 The End.
242 """
243 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
244
245 SIZE_ZERO = 0
246 SIZE_SMALL = 16
247 SIZE_LARGE = len(DATA)
248
249 def upload_data(uploader, data):
250     u = upload.Data(data, convergence=None)
251     return uploader.upload(u)
252 def upload_filename(uploader, filename):
253     u = upload.FileName(filename, convergence=None)
254     return uploader.upload(u)
255 def upload_filehandle(uploader, fh):
256     u = upload.FileHandle(fh, convergence=None)
257     return uploader.upload(u)
258
259 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
260     def setUp(self):
261         self.node = FakeClient(mode="good")
262         self.u = upload.Uploader()
263         self.u.running = True
264         self.u.parent = self.node
265
266     def _check_small(self, newuri, size):
267         u = uri.from_string(newuri)
268         self.failUnless(isinstance(u, uri.LiteralFileURI))
269         self.failUnlessEqual(len(u.data), size)
270
271     def _check_large(self, newuri, size):
272         u = uri.from_string(newuri)
273         self.failUnless(isinstance(u, uri.CHKFileURI))
274         self.failUnless(isinstance(u.get_storage_index(), str))
275         self.failUnlessEqual(len(u.get_storage_index()), 16)
276         self.failUnless(isinstance(u.key, str))
277         self.failUnlessEqual(len(u.key), 16)
278         self.failUnlessEqual(u.size, size)
279
280     def get_data(self, size):
281         return DATA[:size]
282
283     def test_too_large(self):
284         # we've removed the 4GiB share size limit (see ticket #346 for
285         # details), but still have an 8-byte field, so the limit is now
286         # 2**64, so make sure we reject files larger than that.
287         k = 3; happy = 7; n = 10
288         self.set_encoding_parameters(k, happy, n)
289         big = k*(2**64)
290         data1 = GiganticUploadable(big)
291         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
292                             "This file is too large to be uploaded (data_size)",
293                             self.u.upload, data1)
294         data2 = GiganticUploadable(big-3)
295         d.addCallback(lambda res:
296                       self.shouldFail(FileTooLargeError,
297                                       "test_too_large-data2",
298                                       "This file is too large to be uploaded (offsets)",
299                                       self.u.upload, data2))
300         # I don't know where the actual limit is.. it depends upon how large
301         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
302         return d
303
304     def test_data_zero(self):
305         data = self.get_data(SIZE_ZERO)
306         d = upload_data(self.u, data)
307         d.addCallback(extract_uri)
308         d.addCallback(self._check_small, SIZE_ZERO)
309         return d
310
311     def test_data_small(self):
312         data = self.get_data(SIZE_SMALL)
313         d = upload_data(self.u, data)
314         d.addCallback(extract_uri)
315         d.addCallback(self._check_small, SIZE_SMALL)
316         return d
317
318     def test_data_large(self):
319         data = self.get_data(SIZE_LARGE)
320         d = upload_data(self.u, data)
321         d.addCallback(extract_uri)
322         d.addCallback(self._check_large, SIZE_LARGE)
323         return d
324
325     def test_data_large_odd_segments(self):
326         data = self.get_data(SIZE_LARGE)
327         segsize = int(SIZE_LARGE / 2.5)
328         # we want 3 segments, since that's not a power of two
329         self.set_encoding_parameters(25, 25, 100, segsize)
330         d = upload_data(self.u, data)
331         d.addCallback(extract_uri)
332         d.addCallback(self._check_large, SIZE_LARGE)
333         return d
334
335     def test_filehandle_zero(self):
336         data = self.get_data(SIZE_ZERO)
337         d = upload_filehandle(self.u, StringIO(data))
338         d.addCallback(extract_uri)
339         d.addCallback(self._check_small, SIZE_ZERO)
340         return d
341
342     def test_filehandle_small(self):
343         data = self.get_data(SIZE_SMALL)
344         d = upload_filehandle(self.u, StringIO(data))
345         d.addCallback(extract_uri)
346         d.addCallback(self._check_small, SIZE_SMALL)
347         return d
348
349     def test_filehandle_large(self):
350         data = self.get_data(SIZE_LARGE)
351         d = upload_filehandle(self.u, StringIO(data))
352         d.addCallback(extract_uri)
353         d.addCallback(self._check_large, SIZE_LARGE)
354         return d
355
356     def test_filename_zero(self):
357         fn = "Uploader-test_filename_zero.data"
358         f = open(fn, "wb")
359         data = self.get_data(SIZE_ZERO)
360         f.write(data)
361         f.close()
362         d = upload_filename(self.u, fn)
363         d.addCallback(extract_uri)
364         d.addCallback(self._check_small, SIZE_ZERO)
365         return d
366
367     def test_filename_small(self):
368         fn = "Uploader-test_filename_small.data"
369         f = open(fn, "wb")
370         data = self.get_data(SIZE_SMALL)
371         f.write(data)
372         f.close()
373         d = upload_filename(self.u, fn)
374         d.addCallback(extract_uri)
375         d.addCallback(self._check_small, SIZE_SMALL)
376         return d
377
378     def test_filename_large(self):
379         fn = "Uploader-test_filename_large.data"
380         f = open(fn, "wb")
381         data = self.get_data(SIZE_LARGE)
382         f.write(data)
383         f.close()
384         d = upload_filename(self.u, fn)
385         d.addCallback(extract_uri)
386         d.addCallback(self._check_large, SIZE_LARGE)
387         return d
388
389 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
390     def make_node(self, mode, num_servers=10):
391         self.node = FakeClient(mode, num_servers)
392         self.u = upload.Uploader()
393         self.u.running = True
394         self.u.parent = self.node
395
396     def _check_large(self, newuri, size):
397         u = uri.from_string(newuri)
398         self.failUnless(isinstance(u, uri.CHKFileURI))
399         self.failUnless(isinstance(u.get_storage_index(), str))
400         self.failUnlessEqual(len(u.get_storage_index()), 16)
401         self.failUnless(isinstance(u.key, str))
402         self.failUnlessEqual(len(u.key), 16)
403         self.failUnlessEqual(u.size, size)
404
405     def test_first_error(self):
406         mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
407         self.make_node(mode)
408         self.set_encoding_parameters(k=25, happy=1, n=50)
409         d = upload_data(self.u, DATA)
410         d.addCallback(extract_uri)
411         d.addCallback(self._check_large, SIZE_LARGE)
412         return d
413
414     def test_first_error_all(self):
415         self.make_node("first-fail")
416         d = self.shouldFail(UploadUnhappinessError, "first_error_all",
417                             "server selection failed",
418                             upload_data, self.u, DATA)
419         def _check((f,)):
420             self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
421             # there should also be a 'last failure was' message
422             self.failUnlessIn("ServerError", str(f.value))
423         d.addCallback(_check)
424         return d
425
426     def test_second_error(self):
427         # we want to make sure we make it to a third pass. This means that
428         # the first pass was insufficient to place all shares, and at least
429         # one of second pass servers (other than the last one) accepted a
430         # share (so we'll believe that a third pass will be useful). (if
431         # everyone but the last server throws an error, then we'll send all
432         # the remaining shares to the last server at the end of the second
433         # pass, and if that succeeds, we won't make it to a third pass).
434         #
435         # we can achieve this 97.5% of the time by using 40 servers, having
436         # 39 of them fail on the second request, leaving only one to succeed
437         # on the second request. (we need to keep the number of servers low
438         # enough to ensure a second pass with 100 shares).
439         mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
440         self.make_node(mode, 40)
441         d = upload_data(self.u, DATA)
442         d.addCallback(extract_uri)
443         d.addCallback(self._check_large, SIZE_LARGE)
444         return d
445
446     def test_second_error_all(self):
447         self.make_node("second-fail")
448         d = self.shouldFail(UploadUnhappinessError, "second_error_all",
449                             "server selection failed",
450                             upload_data, self.u, DATA)
451         def _check((f,)):
452             self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
453             # there should also be a 'last failure was' message
454             self.failUnlessIn("ServerError", str(f.value))
455         d.addCallback(_check)
456         return d
457
458 class FullServer(unittest.TestCase):
459     def setUp(self):
460         self.node = FakeClient(mode="full")
461         self.u = upload.Uploader()
462         self.u.running = True
463         self.u.parent = self.node
464
465     def _should_fail(self, f):
466         self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
467
468     def test_data_large(self):
469         data = DATA
470         d = upload_data(self.u, data)
471         d.addBoth(self._should_fail)
472         return d
473
474 class ServerSelection(unittest.TestCase):
475
476     def make_client(self, num_servers=50):
477         self.node = FakeClient(mode="good", num_servers=num_servers)
478         self.u = upload.Uploader()
479         self.u.running = True
480         self.u.parent = self.node
481
482     def get_data(self, size):
483         return DATA[:size]
484
485     def _check_large(self, newuri, size):
486         u = uri.from_string(newuri)
487         self.failUnless(isinstance(u, uri.CHKFileURI))
488         self.failUnless(isinstance(u.get_storage_index(), str))
489         self.failUnlessEqual(len(u.get_storage_index()), 16)
490         self.failUnless(isinstance(u.key, str))
491         self.failUnlessEqual(len(u.key), 16)
492         self.failUnlessEqual(u.size, size)
493
494     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
495         p = {"k": k,
496              "happy": happy,
497              "n": n,
498              "max_segment_size": max_segsize,
499              }
500         self.node.DEFAULT_ENCODING_PARAMETERS = p
501
502     def test_one_each(self):
503         # if we have 50 shares, and there are 50 servers, and they all accept
504         # a share, we should get exactly one share per server
505
506         self.make_client()
507         data = self.get_data(SIZE_LARGE)
508         self.set_encoding_parameters(25, 30, 50)
509         d = upload_data(self.u, data)
510         d.addCallback(extract_uri)
511         d.addCallback(self._check_large, SIZE_LARGE)
512         def _check(res):
513             for s in self.node.last_servers:
514                 allocated = s.allocated
515                 self.failUnlessEqual(len(allocated), 1)
516                 self.failUnlessEqual(s.queries, 1)
517         d.addCallback(_check)
518         return d
519
520     def test_two_each(self):
521         # if we have 100 shares, and there are 50 servers, and they all
522         # accept all shares, we should get exactly two shares per server
523
524         self.make_client()
525         data = self.get_data(SIZE_LARGE)
526         # if there are 50 servers, then happy needs to be <= 50
527         self.set_encoding_parameters(50, 50, 100)
528         d = upload_data(self.u, data)
529         d.addCallback(extract_uri)
530         d.addCallback(self._check_large, SIZE_LARGE)
531         def _check(res):
532             for s in self.node.last_servers:
533                 allocated = s.allocated
534                 self.failUnlessEqual(len(allocated), 2)
535                 self.failUnlessEqual(s.queries, 2)
536         d.addCallback(_check)
537         return d
538
539     def test_one_each_plus_one_extra(self):
540         # if we have 51 shares, and there are 50 servers, then one server
541         # gets two shares and the rest get just one
542
543         self.make_client()
544         data = self.get_data(SIZE_LARGE)
545         self.set_encoding_parameters(24, 41, 51)
546         d = upload_data(self.u, data)
547         d.addCallback(extract_uri)
548         d.addCallback(self._check_large, SIZE_LARGE)
549         def _check(res):
550             got_one = []
551             got_two = []
552             for s in self.node.last_servers:
553                 allocated = s.allocated
554                 self.failUnless(len(allocated) in (1,2), len(allocated))
555                 if len(allocated) == 1:
556                     self.failUnlessEqual(s.queries, 1)
557                     got_one.append(s)
558                 else:
559                     self.failUnlessEqual(s.queries, 2)
560                     got_two.append(s)
561             self.failUnlessEqual(len(got_one), 49)
562             self.failUnlessEqual(len(got_two), 1)
563         d.addCallback(_check)
564         return d
565
566     def test_four_each(self):
567         # if we have 200 shares, and there are 50 servers, then each server
568         # gets 4 shares. The design goal is to accomplish this with only two
569         # queries per server.
570
571         self.make_client()
572         data = self.get_data(SIZE_LARGE)
573         # if there are 50 servers, then happy should be no more than 50 if we
574         # want this to work.
575         self.set_encoding_parameters(100, 50, 200)
576         d = upload_data(self.u, data)
577         d.addCallback(extract_uri)
578         d.addCallback(self._check_large, SIZE_LARGE)
579         def _check(res):
580             for s in self.node.last_servers:
581                 allocated = s.allocated
582                 self.failUnlessEqual(len(allocated), 4)
583                 self.failUnlessEqual(s.queries, 2)
584         d.addCallback(_check)
585         return d
586
587     def test_three_of_ten(self):
588         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
589         # 4+4+2
590
591         self.make_client(3)
592         data = self.get_data(SIZE_LARGE)
593         self.set_encoding_parameters(3, 3, 10)
594         d = upload_data(self.u, data)
595         d.addCallback(extract_uri)
596         d.addCallback(self._check_large, SIZE_LARGE)
597         def _check(res):
598             counts = {}
599             for s in self.node.last_servers:
600                 allocated = s.allocated
601                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
602             histogram = [counts.get(i, 0) for i in range(5)]
603             self.failUnlessEqual(histogram, [0,0,0,2,1])
604         d.addCallback(_check)
605         return d
606
607     def test_some_big_some_small(self):
608         # 10 shares, 20 servers, but half the servers don't support a
609         # share-size large enough for our file
610         mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
611         self.node = FakeClient(mode, num_servers=20)
612         self.u = upload.Uploader()
613         self.u.running = True
614         self.u.parent = self.node
615
616         data = self.get_data(SIZE_LARGE)
617         self.set_encoding_parameters(3, 5, 10)
618         d = upload_data(self.u, data)
619         d.addCallback(extract_uri)
620         d.addCallback(self._check_large, SIZE_LARGE)
621         def _check(res):
622             # we should have put one share each on the big servers, and zero
623             # shares on the small servers
624             total_allocated = 0
625             for p in self.node.last_servers:
626                 if p.mode == "good":
627                     self.failUnlessEqual(len(p.allocated), 1)
628                 elif p.mode == "small":
629                     self.failUnlessEqual(len(p.allocated), 0)
630                 total_allocated += len(p.allocated)
631             self.failUnlessEqual(total_allocated, 10)
632         d.addCallback(_check)
633         return d
634
635
636 class StorageIndex(unittest.TestCase):
637     def test_params_must_matter(self):
638         DATA = "I am some data"
639         PARAMS = Client.DEFAULT_ENCODING_PARAMETERS
640
641         u = upload.Data(DATA, convergence="")
642         u.set_default_encoding_parameters(PARAMS)
643         eu = upload.EncryptAnUploadable(u)
644         d1 = eu.get_storage_index()
645
646         # CHK means the same data should encrypt the same way
647         u = upload.Data(DATA, convergence="")
648         u.set_default_encoding_parameters(PARAMS)
649         eu = upload.EncryptAnUploadable(u)
650         d1a = eu.get_storage_index()
651
652         # but if we use a different convergence string it should be different
653         u = upload.Data(DATA, convergence="wheee!")
654         u.set_default_encoding_parameters(PARAMS)
655         eu = upload.EncryptAnUploadable(u)
656         d1salt1 = eu.get_storage_index()
657
658         # and if we add yet a different convergence it should be different again
659         u = upload.Data(DATA, convergence="NOT wheee!")
660         u.set_default_encoding_parameters(PARAMS)
661         eu = upload.EncryptAnUploadable(u)
662         d1salt2 = eu.get_storage_index()
663
664         # and if we use the first string again it should be the same as last time
665         u = upload.Data(DATA, convergence="wheee!")
666         u.set_default_encoding_parameters(PARAMS)
667         eu = upload.EncryptAnUploadable(u)
668         d1salt1a = eu.get_storage_index()
669
670         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
671         u = upload.Data(DATA, convergence="")
672         u.set_default_encoding_parameters(PARAMS)
673         u.encoding_param_k = u.default_encoding_param_k + 1
674         eu = upload.EncryptAnUploadable(u)
675         d2 = eu.get_storage_index()
676
677         # and if we use a random key, it should be different than the CHK
678         u = upload.Data(DATA, convergence=None)
679         u.set_default_encoding_parameters(PARAMS)
680         eu = upload.EncryptAnUploadable(u)
681         d3 = eu.get_storage_index()
682         # and different from another instance
683         u = upload.Data(DATA, convergence=None)
684         u.set_default_encoding_parameters(PARAMS)
685         eu = upload.EncryptAnUploadable(u)
686         d4 = eu.get_storage_index()
687
688         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
689         def _done(res):
690             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
691             self.failUnlessEqual(si1, si1a)
692             self.failIfEqual(si1, si2)
693             self.failIfEqual(si1, si3)
694             self.failIfEqual(si1, si4)
695             self.failIfEqual(si3, si4)
696             self.failIfEqual(si1salt1, si1)
697             self.failIfEqual(si1salt1, si1salt2)
698             self.failIfEqual(si1salt2, si1)
699             self.failUnlessEqual(si1salt1, si1salt1a)
700         d.addCallback(_done)
701         return d
702
703 # copied from python docs because itertools.combinations was added in
704 # python 2.6 and we support >= 2.4.
705 def combinations(iterable, r):
706     # combinations('ABCD', 2) --> AB AC AD BC BD CD
707     # combinations(range(4), 3) --> 012 013 023 123
708     pool = tuple(iterable)
709     n = len(pool)
710     if r > n:
711         return
712     indices = range(r)
713     yield tuple(pool[i] for i in indices)
714     while True:
715         for i in reversed(range(r)):
716             if indices[i] != i + n - r:
717                 break
718         else:
719             return
720         indices[i] += 1
721         for j in range(i+1, r):
722             indices[j] = indices[j-1] + 1
723         yield tuple(pool[i] for i in indices)
724
725 def is_happy_enough(servertoshnums, h, k):
726     """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """
727     if len(servertoshnums) < h:
728         return False
729     # print "servertoshnums: ", servertoshnums, h, k
730     for happysetcombo in combinations(servertoshnums.iterkeys(), h):
731         # print "happysetcombo: ", happysetcombo
732         for subsetcombo in combinations(happysetcombo, k):
733             shnums = reduce(set.union, [ servertoshnums[s] for s in subsetcombo ])
734             # print "subsetcombo: ", subsetcombo, ", shnums: ", shnums
735             if len(shnums) < k:
736                 # print "NOT HAAPP{Y", shnums, k
737                 return False
738     # print "HAAPP{Y"
739     return True
740
741 class FakeServerTracker:
742     def __init__(self, serverid, buckets):
743         self._serverid = serverid
744         self.buckets = buckets
745     def get_serverid(self):
746         return self._serverid
747
748 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
749     ShouldFailMixin):
750     def find_all_shares(self, unused=None):
751         """Locate shares on disk. Returns a dict that maps
752         server to set of sharenums.
753         """
754         assert self.g, "I tried to find a grid at self.g, but failed"
755         servertoshnums = {} # k: server, v: set(shnum)
756
757         for i, c in self.g.servers_by_number.iteritems():
758             for (dirp, dirns, fns) in os.walk(c.sharedir):
759                 for fn in fns:
760                     try:
761                         sharenum = int(fn)
762                     except TypeError:
763                         # Whoops, I guess that's not a share file then.
764                         pass
765                     else:
766                         servertoshnums.setdefault(i, set()).add(sharenum)
767
768         return servertoshnums
769
770     def _do_upload_with_broken_servers(self, servers_to_break):
771         """
772         I act like a normal upload, but before I send the results of
773         Tahoe2ServerSelector to the Encoder, I break the first
774         servers_to_break ServerTrackers in the upload_servers part of the
775         return result.
776         """
777         assert self.g, "I tried to find a grid at self.g, but failed"
778         broker = self.g.clients[0].storage_broker
779         sh     = self.g.clients[0]._secret_holder
780         data = upload.Data("data" * 10000, convergence="")
781         data.set_default_encoding_parameters({'k': 3, 'happy': 4, 'n': 10})
782         uploadable = upload.EncryptAnUploadable(data)
783         encoder = encode.Encoder()
784         encoder.set_encrypted_uploadable(uploadable)
785         status = upload.UploadStatus()
786         selector = upload.Tahoe2ServerSelector("dglev", "test", status)
787         storage_index = encoder.get_param("storage_index")
788         share_size = encoder.get_param("share_size")
789         block_size = encoder.get_param("block_size")
790         num_segments = encoder.get_param("num_segments")
791         d = selector.get_shareholders(broker, sh, storage_index,
792                                       share_size, block_size, num_segments,
793                                       10, 3, 4)
794         def _have_shareholders((upload_trackers, already_servers)):
795             assert servers_to_break <= len(upload_trackers)
796             for index in xrange(servers_to_break):
797                 tracker = list(upload_trackers)[index]
798                 for share in tracker.buckets.keys():
799                     tracker.buckets[share].abort()
800             buckets = {}
801             servermap = already_servers.copy()
802             for tracker in upload_trackers:
803                 buckets.update(tracker.buckets)
804                 for bucket in tracker.buckets:
805                     servermap.setdefault(bucket, set()).add(tracker.get_serverid())
806             encoder.set_shareholders(buckets, servermap)
807             d = encoder.start()
808             return d
809         d.addCallback(_have_shareholders)
810         return d
811
812     def _has_happy_share_distribution(self):
813         servertoshnums = self.find_all_shares()
814         k = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['k']
815         h = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['happy']
816         return is_happy_enough(servertoshnums, h, k)
817
818     def _add_server(self, server_number, readonly=False):
819         assert self.g, "I tried to find a grid at self.g, but failed"
820         ss = self.g.make_server(server_number, readonly)
821         log.msg("just created a server, number: %s => %s" % (server_number, ss,))
822         self.g.add_server(server_number, ss)
823
824     def _add_server_with_share(self, server_number, share_number=None,
825                                readonly=False):
826         self._add_server(server_number, readonly)
827         if share_number is not None:
828             self._copy_share_to_server(share_number, server_number)
829
830
831     def _copy_share_to_server(self, share_number, server_number):
832         ss = self.g.servers_by_number[server_number]
833         # Copy share i from the directory associated with the first
834         # storage server to the directory associated with this one.
835         assert self.g, "I tried to find a grid at self.g, but failed"
836         assert self.shares, "I tried to find shares at self.shares, but failed"
837         old_share_location = self.shares[share_number][2]
838         new_share_location = os.path.join(ss.storedir, "shares")
839         si = uri.from_string(self.uri).get_storage_index()
840         new_share_location = os.path.join(new_share_location,
841                                           storage_index_to_dir(si))
842         if not os.path.exists(new_share_location):
843             os.makedirs(new_share_location)
844         new_share_location = os.path.join(new_share_location,
845                                           str(share_number))
846         if old_share_location != new_share_location:
847             shutil.copy(old_share_location, new_share_location)
848         shares = self.find_uri_shares(self.uri)
849         # Make sure that the storage server has the share.
850         self.failUnless((share_number, ss.my_nodeid, new_share_location)
851                         in shares)
852
853     def _setup_grid(self):
854         """
855         I set up a NoNetworkGrid with a single server and client.
856         """
857         self.set_up_grid(num_clients=1, num_servers=1)
858
859     def _setup_and_upload(self, **kwargs):
860         """
861         I set up a NoNetworkGrid with a single server and client,
862         upload a file to it, store its uri in self.uri, and store its
863         sharedata in self.shares.
864         """
865         self._setup_grid()
866         client = self.g.clients[0]
867         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
868         if "n" in kwargs and "k" in kwargs:
869             client.DEFAULT_ENCODING_PARAMETERS['k'] = kwargs['k']
870             client.DEFAULT_ENCODING_PARAMETERS['n'] = kwargs['n']
871         data = upload.Data("data" * 10000, convergence="")
872         self.data = data
873         d = client.upload(data)
874         def _store_uri(ur):
875             self.uri = ur.get_uri()
876         d.addCallback(_store_uri)
877         d.addCallback(lambda ign:
878             self.find_uri_shares(self.uri))
879         def _store_shares(shares):
880             self.shares = shares
881         d.addCallback(_store_shares)
882         return d
883
884     def test_configure_parameters(self):
885         self.basedir = self.mktemp()
886         hooks = {0: self._set_up_nodes_extra_config}
887         self.set_up_grid(client_config_hooks=hooks)
888         c0 = self.g.clients[0]
889
890         DATA = "data" * 100
891         u = upload.Data(DATA, convergence="")
892         d = c0.upload(u)
893         d.addCallback(lambda ur: c0.create_node_from_uri(ur.get_uri()))
894         m = monitor.Monitor()
895         d.addCallback(lambda fn: fn.check(m))
896         def _check(cr):
897             self.failUnlessEqual(cr.get_encoding_needed(), 7)
898             self.failUnlessEqual(cr.get_encoding_expected(), 12)
899         d.addCallback(_check)
900         return d
901
902
903     def _setUp(self, ns):
904         # Used by test_happy_semantics and test_preexisting_share_behavior
905         # to set up the grid.
906         self.node = FakeClient(mode="good", num_servers=ns)
907         self.u = upload.Uploader()
908         self.u.running = True
909         self.u.parent = self.node
910
911
912     def test_happy_semantics(self):
913         self._setUp(2)
914         DATA = upload.Data("kittens" * 10000, convergence="")
915         # These parameters are unsatisfiable with only 2 servers.
916         self.set_encoding_parameters(k=3, happy=5, n=10)
917         d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
918                             "shares could be placed or found on only 2 "
919                             "server(s). We were asked to place shares on "
920                             "at least 5 server(s) such that any 3 of them "
921                             "have enough shares to recover the file",
922                             self.u.upload, DATA)
923         # Let's reset the client to have 10 servers
924         d.addCallback(lambda ign:
925             self._setUp(10))
926         # These parameters are satisfiable with 10 servers.
927         d.addCallback(lambda ign:
928             self.set_encoding_parameters(k=3, happy=5, n=10))
929         d.addCallback(lambda ign:
930             self.u.upload(DATA))
931         # Let's reset the client to have 7 servers
932         # (this is less than n, but more than h)
933         d.addCallback(lambda ign:
934             self._setUp(7))
935         # These parameters are satisfiable with 7 servers.
936         d.addCallback(lambda ign:
937             self.set_encoding_parameters(k=3, happy=5, n=10))
938         d.addCallback(lambda ign:
939             self.u.upload(DATA))
940         return d
941
942     def test_aborted_shares(self):
943         self.basedir = "upload/EncodingParameters/aborted_shares"
944         self.set_up_grid(num_servers=4)
945         c = self.g.clients[0]
946         DATA = upload.Data(100* "kittens", convergence="")
947         # These parameters are unsatisfiable with only 4 servers, but should
948         # work with 5, as long as the original 4 are not stuck in the open
949         # BucketWriter state (open() but not
950         parms = {"k":2, "happy":5, "n":5, "max_segment_size": 1*MiB}
951         c.DEFAULT_ENCODING_PARAMETERS = parms
952         d = self.shouldFail(UploadUnhappinessError, "test_aborted_shares",
953                             "shares could be placed on only 4 "
954                             "server(s) such that any 2 of them have enough "
955                             "shares to recover the file, but we were asked "
956                             "to place shares on at least 5 such servers",
957                             c.upload, DATA)
958         # now add the 5th server
959         d.addCallback(lambda ign: self._add_server(4, False))
960         # and this time the upload ought to succeed
961         d.addCallback(lambda ign: c.upload(DATA))
962         d.addCallback(lambda ign:
963             self.failUnless(self._has_happy_share_distribution()))
964         return d
965
966
967     def test_problem_layout_comment_52(self):
968         def _basedir():
969             self.basedir = self.mktemp()
970         _basedir()
971         # This scenario is at
972         # http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:52
973         #
974         # The scenario in comment:52 proposes that we have a layout
975         # like:
976         # server 0: shares 1 - 9
977         # server 1: share 0, read-only
978         # server 2: share 0, read-only
979         # server 3: share 0, read-only
980         # To get access to the shares, we will first upload to one
981         # server, which will then have shares 0 - 9. We'll then
982         # add three new servers, configure them to not accept any new
983         # shares, then write share 0 directly into the serverdir of each,
984         # and then remove share 0 from server 0 in the same way.
985         # Then each of servers 1 - 3 will report that they have share 0,
986         # and will not accept any new share, while server 0 will report that
987         # it has shares 1 - 9 and will accept new shares.
988         # We'll then set 'happy' = 4, and see that an upload fails
989         # (as it should)
990         d = self._setup_and_upload()
991         d.addCallback(lambda ign:
992             self._add_server_with_share(server_number=1, share_number=0,
993                                         readonly=True))
994         d.addCallback(lambda ign:
995             self._add_server_with_share(server_number=2, share_number=0,
996                                         readonly=True))
997         d.addCallback(lambda ign:
998             self._add_server_with_share(server_number=3, share_number=0,
999                                         readonly=True))
1000         # Remove the first share from server 0.
1001         def _remove_share_0_from_server_0():
1002             share_location = self.shares[0][2]
1003             os.remove(share_location)
1004         d.addCallback(lambda ign:
1005             _remove_share_0_from_server_0())
1006         # Set happy = 4 in the client.
1007         def _prepare():
1008             client = self.g.clients[0]
1009             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1010             return client
1011         d.addCallback(lambda ign:
1012             _prepare())
1013         # Uploading data should fail
1014         d.addCallback(lambda client:
1015             self.shouldFail(UploadUnhappinessError,
1016                             "test_problem_layout_comment_52_test_1",
1017                             "shares could be placed or found on 4 server(s), "
1018                             "but they are not spread out evenly enough to "
1019                             "ensure that any 3 of these servers would have "
1020                             "enough shares to recover the file. "
1021                             "We were asked to place shares on at "
1022                             "least 4 servers such that any 3 of them have "
1023                             "enough shares to recover the file",
1024                             client.upload, upload.Data("data" * 10000,
1025                                                        convergence="")))
1026
1027         # Do comment:52, but like this:
1028         # server 2: empty
1029         # server 3: share 0, read-only
1030         # server 1: share 0, read-only
1031         # server 0: shares 0-9
1032         d.addCallback(lambda ign:
1033             _basedir())
1034         d.addCallback(lambda ign:
1035             self._setup_and_upload())
1036         d.addCallback(lambda ign:
1037             self._add_server(server_number=2))
1038         d.addCallback(lambda ign:
1039             self._add_server_with_share(server_number=3, share_number=0,
1040                                         readonly=True))
1041         d.addCallback(lambda ign:
1042             self._add_server_with_share(server_number=1, share_number=0,
1043                                         readonly=True))
1044         def _prepare2():
1045             client = self.g.clients[0]
1046             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1047             return client
1048         d.addCallback(lambda ign:
1049             _prepare2())
1050         d.addCallback(lambda client:
1051             self.shouldFail(UploadUnhappinessError,
1052                             "test_problem_layout_comment_52_test_2",
1053                             "shares could be placed on only 3 server(s) such "
1054                             "that any 3 of them have enough shares to recover "
1055                             "the file, but we were asked to place shares on "
1056                             "at least 4 such servers.",
1057                             client.upload, upload.Data("data" * 10000,
1058                                                        convergence="")))
1059         return d
1060
1061
1062     def test_problem_layout_comment_53(self):
1063         # This scenario is at
1064         # http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:53
1065         #
1066         # Set up the grid to have one server
1067         def _change_basedir(ign):
1068             self.basedir = self.mktemp()
1069         _change_basedir(None)
1070         # We start by uploading all of the shares to one server.
1071         # Next, we'll add three new servers to our NoNetworkGrid. We'll add
1072         # one share from our initial upload to each of these.
1073         # The counterintuitive ordering of the share numbers is to deal with
1074         # the permuting of these servers -- distributing the shares this
1075         # way ensures that the Tahoe2ServerSelector sees them in the order
1076         # described below.
1077         d = self._setup_and_upload()
1078         d.addCallback(lambda ign:
1079             self._add_server_with_share(server_number=1, share_number=2))
1080         d.addCallback(lambda ign:
1081             self._add_server_with_share(server_number=2, share_number=0))
1082         d.addCallback(lambda ign:
1083             self._add_server_with_share(server_number=3, share_number=1))
1084         # So, we now have the following layout:
1085         # server 0: shares 0 - 9
1086         # server 1: share 2
1087         # server 2: share 0
1088         # server 3: share 1
1089         # We change the 'happy' parameter in the client to 4.
1090         # The Tahoe2ServerSelector will see the servers permuted as:
1091         # 2, 3, 1, 0
1092         # Ideally, a reupload of our original data should work.
1093         def _reset_encoding_parameters(ign, happy=4):
1094             client = self.g.clients[0]
1095             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1096             return client
1097         d.addCallback(_reset_encoding_parameters)
1098         d.addCallback(lambda client:
1099             client.upload(upload.Data("data" * 10000, convergence="")))
1100         d.addCallback(lambda ign:
1101             self.failUnless(self._has_happy_share_distribution()))
1102
1103
1104         # This scenario is basically comment:53, but changed so that the
1105         # Tahoe2ServerSelector sees the server with all of the shares before
1106         # any of the other servers.
1107         # The layout is:
1108         # server 2: shares 0 - 9
1109         # server 3: share 0
1110         # server 1: share 1
1111         # server 4: share 2
1112         # The Tahoe2ServerSelector sees the servers permuted as:
1113         # 2, 3, 1, 4
1114         # Note that server 0 has been replaced by server 4; this makes it
1115         # easier to ensure that the last server seen by Tahoe2ServerSelector
1116         # has only one share.
1117         d.addCallback(_change_basedir)
1118         d.addCallback(lambda ign:
1119             self._setup_and_upload())
1120         d.addCallback(lambda ign:
1121             self._add_server_with_share(server_number=2, share_number=0))
1122         d.addCallback(lambda ign:
1123             self._add_server_with_share(server_number=3, share_number=1))
1124         d.addCallback(lambda ign:
1125             self._add_server_with_share(server_number=1, share_number=2))
1126         # Copy all of the other shares to server number 2
1127         def _copy_shares(ign):
1128             for i in xrange(0, 10):
1129                 self._copy_share_to_server(i, 2)
1130         d.addCallback(_copy_shares)
1131         # Remove the first server, and add a placeholder with share 0
1132         d.addCallback(lambda ign:
1133             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1134         d.addCallback(lambda ign:
1135             self._add_server_with_share(server_number=4, share_number=0))
1136         # Now try uploading.
1137         d.addCallback(_reset_encoding_parameters)
1138         d.addCallback(lambda client:
1139             client.upload(upload.Data("data" * 10000, convergence="")))
1140         d.addCallback(lambda ign:
1141             self.failUnless(self._has_happy_share_distribution()))
1142
1143
1144         # Try the same thing, but with empty servers after the first one
1145         # We want to make sure that Tahoe2ServerSelector will redistribute
1146         # shares as necessary, not simply discover an existing layout.
1147         # The layout is:
1148         # server 2: shares 0 - 9
1149         # server 3: empty
1150         # server 1: empty
1151         # server 4: empty
1152         d.addCallback(_change_basedir)
1153         d.addCallback(lambda ign:
1154             self._setup_and_upload())
1155         d.addCallback(lambda ign:
1156             self._add_server(server_number=2))
1157         d.addCallback(lambda ign:
1158             self._add_server(server_number=3))
1159         d.addCallback(lambda ign:
1160             self._add_server(server_number=1))
1161         d.addCallback(lambda ign:
1162             self._add_server(server_number=4))
1163         d.addCallback(_copy_shares)
1164         d.addCallback(lambda ign:
1165             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1166         d.addCallback(_reset_encoding_parameters)
1167         d.addCallback(lambda client:
1168             client.upload(upload.Data("data" * 10000, convergence="")))
1169         # Make sure that only as many shares as necessary to satisfy
1170         # servers of happiness were pushed.
1171         d.addCallback(lambda results:
1172             self.failUnlessEqual(results.get_pushed_shares(), 3))
1173         d.addCallback(lambda ign:
1174             self.failUnless(self._has_happy_share_distribution()))
1175         return d
1176
1177     def test_problem_layout_ticket_1124(self):
1178         self.basedir = self.mktemp()
1179         d = self._setup_and_upload(k=2, n=4)
1180
1181         # server 0: shares 0, 1, 2, 3
1182         # server 1: shares 0, 3
1183         # server 2: share 1
1184         # server 3: share 2
1185         # With this layout, an upload should just be satisfied that the current distribution is good enough, right?
1186         def _setup(ign):
1187             self._add_server_with_share(server_number=0, share_number=None)
1188             self._add_server_with_share(server_number=1, share_number=0)
1189             self._add_server_with_share(server_number=2, share_number=1)
1190             self._add_server_with_share(server_number=3, share_number=2)
1191             # Copy shares
1192             self._copy_share_to_server(3, 1)
1193             client = self.g.clients[0]
1194             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1195             return client
1196
1197         d.addCallback(_setup)
1198         d.addCallback(lambda client:
1199             client.upload(upload.Data("data" * 10000, convergence="")))
1200         d.addCallback(lambda ign:
1201             self.failUnless(self._has_happy_share_distribution()))
1202         return d
1203     test_problem_layout_ticket_1124.todo = "Fix this after 1.7.1 release."
1204
1205     def test_happiness_with_some_readonly_servers(self):
1206         # Try the following layout
1207         # server 2: shares 0-9
1208         # server 4: share 0, read-only
1209         # server 3: share 1, read-only
1210         # server 1: share 2, read-only
1211         self.basedir = self.mktemp()
1212         d = self._setup_and_upload()
1213         d.addCallback(lambda ign:
1214             self._add_server_with_share(server_number=2, share_number=0))
1215         d.addCallback(lambda ign:
1216             self._add_server_with_share(server_number=3, share_number=1,
1217                                         readonly=True))
1218         d.addCallback(lambda ign:
1219             self._add_server_with_share(server_number=1, share_number=2,
1220                                         readonly=True))
1221         # Copy all of the other shares to server number 2
1222         def _copy_shares(ign):
1223             for i in xrange(1, 10):
1224                 self._copy_share_to_server(i, 2)
1225         d.addCallback(_copy_shares)
1226         # Remove server 0, and add another in its place
1227         d.addCallback(lambda ign:
1228             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1229         d.addCallback(lambda ign:
1230             self._add_server_with_share(server_number=4, share_number=0,
1231                                         readonly=True))
1232         def _reset_encoding_parameters(ign, happy=4):
1233             client = self.g.clients[0]
1234             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1235             return client
1236         d.addCallback(_reset_encoding_parameters)
1237         d.addCallback(lambda client:
1238             client.upload(upload.Data("data" * 10000, convergence="")))
1239         d.addCallback(lambda ign:
1240             self.failUnless(self._has_happy_share_distribution()))
1241         return d
1242
1243
1244     def test_happiness_with_all_readonly_servers(self):
1245         # server 3: share 1, read-only
1246         # server 1: share 2, read-only
1247         # server 2: shares 0-9, read-only
1248         # server 4: share 0, read-only
1249         # The idea with this test is to make sure that the survey of
1250         # read-only servers doesn't undercount servers of happiness
1251         self.basedir = self.mktemp()
1252         d = self._setup_and_upload()
1253         d.addCallback(lambda ign:
1254             self._add_server_with_share(server_number=4, share_number=0,
1255                                         readonly=True))
1256         d.addCallback(lambda ign:
1257             self._add_server_with_share(server_number=3, share_number=1,
1258                                         readonly=True))
1259         d.addCallback(lambda ign:
1260             self._add_server_with_share(server_number=1, share_number=2,
1261                                         readonly=True))
1262         d.addCallback(lambda ign:
1263             self._add_server_with_share(server_number=2, share_number=0,
1264                                         readonly=True))
1265         def _copy_shares(ign):
1266             for i in xrange(1, 10):
1267                 self._copy_share_to_server(i, 2)
1268         d.addCallback(_copy_shares)
1269         d.addCallback(lambda ign:
1270             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1271         def _reset_encoding_parameters(ign, happy=4):
1272             client = self.g.clients[0]
1273             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1274             return client
1275         d.addCallback(_reset_encoding_parameters)
1276         d.addCallback(lambda client:
1277             client.upload(upload.Data("data" * 10000, convergence="")))
1278         d.addCallback(lambda ign:
1279             self.failUnless(self._has_happy_share_distribution()))
1280         return d
1281
1282
1283     def test_dropped_servers_in_encoder(self):
1284         # The Encoder does its own "servers_of_happiness" check if it
1285         # happens to lose a bucket during an upload (it assumes that
1286         # the layout presented to it satisfies "servers_of_happiness"
1287         # until a failure occurs)
1288         #
1289         # This test simulates an upload where servers break after server
1290         # selection, but before they are written to.
1291         def _set_basedir(ign=None):
1292             self.basedir = self.mktemp()
1293         _set_basedir()
1294         d = self._setup_and_upload();
1295         # Add 5 servers
1296         def _do_server_setup(ign):
1297             self._add_server(server_number=1)
1298             self._add_server(server_number=2)
1299             self._add_server(server_number=3)
1300             self._add_server(server_number=4)
1301             self._add_server(server_number=5)
1302         d.addCallback(_do_server_setup)
1303         # remove the original server
1304         # (necessary to ensure that the Tahoe2ServerSelector will distribute
1305         #  all the shares)
1306         def _remove_server(ign):
1307             server = self.g.servers_by_number[0]
1308             self.g.remove_server(server.my_nodeid)
1309         d.addCallback(_remove_server)
1310         # This should succeed; we still have 4 servers, and the
1311         # happiness of the upload is 4.
1312         d.addCallback(lambda ign:
1313             self._do_upload_with_broken_servers(1))
1314         # Now, do the same thing over again, but drop 2 servers instead
1315         # of 1. This should fail, because servers_of_happiness is 4 and
1316         # we can't satisfy that.
1317         d.addCallback(_set_basedir)
1318         d.addCallback(lambda ign:
1319             self._setup_and_upload())
1320         d.addCallback(_do_server_setup)
1321         d.addCallback(_remove_server)
1322         d.addCallback(lambda ign:
1323             self.shouldFail(UploadUnhappinessError,
1324                             "test_dropped_servers_in_encoder",
1325                             "shares could be placed on only 3 server(s) "
1326                             "such that any 3 of them have enough shares to "
1327                             "recover the file, but we were asked to place "
1328                             "shares on at least 4",
1329                             self._do_upload_with_broken_servers, 2))
1330         # Now do the same thing over again, but make some of the servers
1331         # readonly, break some of the ones that aren't, and make sure that
1332         # happiness accounting is preserved.
1333         d.addCallback(_set_basedir)
1334         d.addCallback(lambda ign:
1335             self._setup_and_upload())
1336         def _do_server_setup_2(ign):
1337             self._add_server(1)
1338             self._add_server(2)
1339             self._add_server(3)
1340             self._add_server_with_share(4, 7, readonly=True)
1341             self._add_server_with_share(5, 8, readonly=True)
1342         d.addCallback(_do_server_setup_2)
1343         d.addCallback(_remove_server)
1344         d.addCallback(lambda ign:
1345             self._do_upload_with_broken_servers(1))
1346         d.addCallback(_set_basedir)
1347         d.addCallback(lambda ign:
1348             self._setup_and_upload())
1349         d.addCallback(_do_server_setup_2)
1350         d.addCallback(_remove_server)
1351         d.addCallback(lambda ign:
1352             self.shouldFail(UploadUnhappinessError,
1353                             "test_dropped_servers_in_encoder",
1354                             "shares could be placed on only 3 server(s) "
1355                             "such that any 3 of them have enough shares to "
1356                             "recover the file, but we were asked to place "
1357                             "shares on at least 4",
1358                             self._do_upload_with_broken_servers, 2))
1359         return d
1360
1361
1362     def test_merge_servers(self):
1363         # merge_servers merges a list of upload_servers and a dict of
1364         # shareid -> serverid mappings.
1365         shares = {
1366                     1 : set(["server1"]),
1367                     2 : set(["server2"]),
1368                     3 : set(["server3"]),
1369                     4 : set(["server4", "server5"]),
1370                     5 : set(["server1", "server2"]),
1371                  }
1372         # if not provided with a upload_servers argument, it should just
1373         # return the first argument unchanged.
1374         self.failUnlessEqual(shares, merge_servers(shares, set([])))
1375         trackers = []
1376         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1377             t = FakeServerTracker(server, [i])
1378             trackers.append(t)
1379         expected = {
1380                     1 : set(["server1"]),
1381                     2 : set(["server2"]),
1382                     3 : set(["server3"]),
1383                     4 : set(["server4", "server5"]),
1384                     5 : set(["server1", "server2", "server5"]),
1385                     6 : set(["server6"]),
1386                     7 : set(["server7"]),
1387                     8 : set(["server8"]),
1388                    }
1389         self.failUnlessEqual(expected, merge_servers(shares, set(trackers)))
1390         shares2 = {}
1391         expected = {
1392                     5 : set(["server5"]),
1393                     6 : set(["server6"]),
1394                     7 : set(["server7"]),
1395                     8 : set(["server8"]),
1396                    }
1397         self.failUnlessEqual(expected, merge_servers(shares2, set(trackers)))
1398         shares3 = {}
1399         trackers = []
1400         expected = {}
1401         for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1402             shares3[i] = set([server])
1403             t = FakeServerTracker(server, [i])
1404             trackers.append(t)
1405             expected[i] = set([server])
1406         self.failUnlessEqual(expected, merge_servers(shares3, set(trackers)))
1407
1408
1409     def test_servers_of_happiness_utility_function(self):
1410         # These tests are concerned with the servers_of_happiness()
1411         # utility function, and its underlying matching algorithm. Other
1412         # aspects of the servers_of_happiness behavior are tested
1413         # elsehwere These tests exist to ensure that
1414         # servers_of_happiness doesn't under or overcount the happiness
1415         # value for given inputs.
1416
1417         # servers_of_happiness expects a dict of
1418         # shnum => set(serverids) as a preexisting shares argument.
1419         test1 = {
1420                  1 : set(["server1"]),
1421                  2 : set(["server2"]),
1422                  3 : set(["server3"]),
1423                  4 : set(["server4"])
1424                 }
1425         happy = servers_of_happiness(test1)
1426         self.failUnlessEqual(4, happy)
1427         test1[4] = set(["server1"])
1428         # We've added a duplicate server, so now servers_of_happiness
1429         # should be 3 instead of 4.
1430         happy = servers_of_happiness(test1)
1431         self.failUnlessEqual(3, happy)
1432         # The second argument of merge_servers should be a set of objects with
1433         # serverid and buckets as attributes. In actual use, these will be
1434         # ServerTracker instances, but for testing it is fine to make a
1435         # FakeServerTracker whose job is to hold those instance variables to
1436         # test that part.
1437         trackers = []
1438         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1439             t = FakeServerTracker(server, [i])
1440             trackers.append(t)
1441         # Recall that test1 is a server layout with servers_of_happiness
1442         # = 3.  Since there isn't any overlap between the shnum ->
1443         # set([serverid]) correspondences in test1 and those in trackers,
1444         # the result here should be 7.
1445         test2 = merge_servers(test1, set(trackers))
1446         happy = servers_of_happiness(test2)
1447         self.failUnlessEqual(7, happy)
1448         # Now add an overlapping server to trackers. This is redundant,
1449         # so it should not cause the previously reported happiness value
1450         # to change.
1451         t = FakeServerTracker("server1", [1])
1452         trackers.append(t)
1453         test2 = merge_servers(test1, set(trackers))
1454         happy = servers_of_happiness(test2)
1455         self.failUnlessEqual(7, happy)
1456         test = {}
1457         happy = servers_of_happiness(test)
1458         self.failUnlessEqual(0, happy)
1459         # Test a more substantial overlap between the trackers and the
1460         # existing assignments.
1461         test = {
1462             1 : set(['server1']),
1463             2 : set(['server2']),
1464             3 : set(['server3']),
1465             4 : set(['server4']),
1466         }
1467         trackers = []
1468         t = FakeServerTracker('server5', [4])
1469         trackers.append(t)
1470         t = FakeServerTracker('server6', [3, 5])
1471         trackers.append(t)
1472         # The value returned by servers_of_happiness is the size
1473         # of a maximum matching in the bipartite graph that
1474         # servers_of_happiness() makes between serverids and share
1475         # numbers. It should find something like this:
1476         # (server 1, share 1)
1477         # (server 2, share 2)
1478         # (server 3, share 3)
1479         # (server 5, share 4)
1480         # (server 6, share 5)
1481         #
1482         # and, since there are 5 edges in this matching, it should
1483         # return 5.
1484         test2 = merge_servers(test, set(trackers))
1485         happy = servers_of_happiness(test2)
1486         self.failUnlessEqual(5, happy)
1487         # Zooko's first puzzle:
1488         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1489         #
1490         # server 1: shares 0, 1
1491         # server 2: shares 1, 2
1492         # server 3: share 2
1493         #
1494         # This should yield happiness of 3.
1495         test = {
1496             0 : set(['server1']),
1497             1 : set(['server1', 'server2']),
1498             2 : set(['server2', 'server3']),
1499         }
1500         self.failUnlessEqual(3, servers_of_happiness(test))
1501         # Zooko's second puzzle:
1502         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1503         #
1504         # server 1: shares 0, 1
1505         # server 2: share 1
1506         #
1507         # This should yield happiness of 2.
1508         test = {
1509             0 : set(['server1']),
1510             1 : set(['server1', 'server2']),
1511         }
1512         self.failUnlessEqual(2, servers_of_happiness(test))
1513
1514
1515     def test_shares_by_server(self):
1516         test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1517         sbs = shares_by_server(test)
1518         self.failUnlessEqual(set([1]), sbs["server1"])
1519         self.failUnlessEqual(set([2]), sbs["server2"])
1520         self.failUnlessEqual(set([3]), sbs["server3"])
1521         self.failUnlessEqual(set([4]), sbs["server4"])
1522         test1 = {
1523                     1 : set(["server1"]),
1524                     2 : set(["server1"]),
1525                     3 : set(["server1"]),
1526                     4 : set(["server2"]),
1527                     5 : set(["server2"])
1528                 }
1529         sbs = shares_by_server(test1)
1530         self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1531         self.failUnlessEqual(set([4, 5]), sbs["server2"])
1532         # This should fail unless the serverid part of the mapping is a set
1533         test2 = {1: "server1"}
1534         self.shouldFail(AssertionError,
1535                        "test_shares_by_server",
1536                        "",
1537                        shares_by_server, test2)
1538
1539
1540     def test_existing_share_detection(self):
1541         self.basedir = self.mktemp()
1542         d = self._setup_and_upload()
1543         # Our final setup should look like this:
1544         # server 1: shares 0 - 9, read-only
1545         # server 2: empty
1546         # server 3: empty
1547         # server 4: empty
1548         # The purpose of this test is to make sure that the server selector
1549         # knows about the shares on server 1, even though it is read-only.
1550         # It used to simply filter these out, which would cause the test
1551         # to fail when servers_of_happiness = 4.
1552         d.addCallback(lambda ign:
1553             self._add_server_with_share(1, 0, True))
1554         d.addCallback(lambda ign:
1555             self._add_server(2))
1556         d.addCallback(lambda ign:
1557             self._add_server(3))
1558         d.addCallback(lambda ign:
1559             self._add_server(4))
1560         def _copy_shares(ign):
1561             for i in xrange(1, 10):
1562                 self._copy_share_to_server(i, 1)
1563         d.addCallback(_copy_shares)
1564         d.addCallback(lambda ign:
1565             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1566         def _prepare_client(ign):
1567             client = self.g.clients[0]
1568             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1569             return client
1570         d.addCallback(_prepare_client)
1571         d.addCallback(lambda client:
1572             client.upload(upload.Data("data" * 10000, convergence="")))
1573         d.addCallback(lambda ign:
1574             self.failUnless(self._has_happy_share_distribution()))
1575         return d
1576
1577
1578     def test_query_counting(self):
1579         # If server selection fails, Tahoe2ServerSelector prints out a lot
1580         # of helpful diagnostic information, including query stats.
1581         # This test helps make sure that that information is accurate.
1582         self.basedir = self.mktemp()
1583         d = self._setup_and_upload()
1584         def _setup(ign):
1585             for i in xrange(1, 11):
1586                 self._add_server(server_number=i)
1587             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1588             c = self.g.clients[0]
1589             # We set happy to an unsatisfiable value so that we can check the
1590             # counting in the exception message. The same progress message
1591             # is also used when the upload is successful, but in that case it
1592             # only gets written to a log, so we can't see what it says.
1593             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1594             return c
1595         d.addCallback(_setup)
1596         d.addCallback(lambda c:
1597             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1598                             "10 queries placed some shares",
1599                             c.upload, upload.Data("data" * 10000,
1600                                                   convergence="")))
1601         # Now try with some readonly servers. We want to make sure that
1602         # the readonly server share discovery phase is counted correctly.
1603         def _reset(ign):
1604             self.basedir = self.mktemp()
1605             self.g = None
1606         d.addCallback(_reset)
1607         d.addCallback(lambda ign:
1608             self._setup_and_upload())
1609         def _then(ign):
1610             for i in xrange(1, 11):
1611                 self._add_server(server_number=i)
1612             self._add_server(server_number=11, readonly=True)
1613             self._add_server(server_number=12, readonly=True)
1614             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1615             c = self.g.clients[0]
1616             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1617             return c
1618         d.addCallback(_then)
1619         d.addCallback(lambda c:
1620             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1621                             "2 placed none (of which 2 placed none due to "
1622                             "the server being full",
1623                             c.upload, upload.Data("data" * 10000,
1624                                                   convergence="")))
1625         # Now try the case where the upload process finds a bunch of the
1626         # shares that it wants to place on the first server, including
1627         # the one that it wanted to allocate there. Though no shares will
1628         # be allocated in this request, it should still be called
1629         # productive, since it caused some homeless shares to be
1630         # removed.
1631         d.addCallback(_reset)
1632         d.addCallback(lambda ign:
1633             self._setup_and_upload())
1634
1635         def _next(ign):
1636             for i in xrange(1, 11):
1637                 self._add_server(server_number=i)
1638             # Copy all of the shares to server 9, since that will be
1639             # the first one that the selector sees.
1640             for i in xrange(10):
1641                 self._copy_share_to_server(i, 9)
1642             # Remove server 0, and its contents
1643             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1644             # Make happiness unsatisfiable
1645             c = self.g.clients[0]
1646             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1647             return c
1648         d.addCallback(_next)
1649         d.addCallback(lambda c:
1650             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1651                             "1 queries placed some shares",
1652                             c.upload, upload.Data("data" * 10000,
1653                                                   convergence="")))
1654         return d
1655
1656
1657     def test_upper_limit_on_readonly_queries(self):
1658         self.basedir = self.mktemp()
1659         d = self._setup_and_upload()
1660         def _then(ign):
1661             for i in xrange(1, 11):
1662                 self._add_server(server_number=i, readonly=True)
1663             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1664             c = self.g.clients[0]
1665             c.DEFAULT_ENCODING_PARAMETERS['k'] = 2
1666             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1667             c.DEFAULT_ENCODING_PARAMETERS['n'] = 4
1668             return c
1669         d.addCallback(_then)
1670         d.addCallback(lambda client:
1671             self.shouldFail(UploadUnhappinessError,
1672                             "test_upper_limit_on_readonly_queries",
1673                             "sent 8 queries to 8 servers",
1674                             client.upload,
1675                             upload.Data('data' * 10000, convergence="")))
1676         return d
1677
1678
1679     def test_exception_messages_during_server_selection(self):
1680         # server 1: read-only, no shares
1681         # server 2: read-only, no shares
1682         # server 3: read-only, no shares
1683         # server 4: read-only, no shares
1684         # server 5: read-only, no shares
1685         # This will fail, but we want to make sure that the log messages
1686         # are informative about why it has failed.
1687         self.basedir = self.mktemp()
1688         d = self._setup_and_upload()
1689         d.addCallback(lambda ign:
1690             self._add_server(server_number=1, readonly=True))
1691         d.addCallback(lambda ign:
1692             self._add_server(server_number=2, readonly=True))
1693         d.addCallback(lambda ign:
1694             self._add_server(server_number=3, readonly=True))
1695         d.addCallback(lambda ign:
1696             self._add_server(server_number=4, readonly=True))
1697         d.addCallback(lambda ign:
1698             self._add_server(server_number=5, readonly=True))
1699         d.addCallback(lambda ign:
1700             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1701         def _reset_encoding_parameters(ign, happy=4):
1702             client = self.g.clients[0]
1703             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1704             return client
1705         d.addCallback(_reset_encoding_parameters)
1706         d.addCallback(lambda client:
1707             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1708                             "placed 0 shares out of 10 "
1709                             "total (10 homeless), want to place shares on at "
1710                             "least 4 servers such that any 3 of them have "
1711                             "enough shares to recover the file, "
1712                             "sent 5 queries to 5 servers, 0 queries placed "
1713                             "some shares, 5 placed none "
1714                             "(of which 5 placed none due to the server being "
1715                             "full and 0 placed none due to an error)",
1716                             client.upload,
1717                             upload.Data("data" * 10000, convergence="")))
1718
1719
1720         # server 1: read-only, no shares
1721         # server 2: broken, no shares
1722         # server 3: read-only, no shares
1723         # server 4: read-only, no shares
1724         # server 5: read-only, no shares
1725         def _reset(ign):
1726             self.basedir = self.mktemp()
1727         d.addCallback(_reset)
1728         d.addCallback(lambda ign:
1729             self._setup_and_upload())
1730         d.addCallback(lambda ign:
1731             self._add_server(server_number=1, readonly=True))
1732         d.addCallback(lambda ign:
1733             self._add_server(server_number=2))
1734         def _break_server_2(ign):
1735             serverid = self.g.servers_by_number[2].my_nodeid
1736             self.g.break_server(serverid)
1737         d.addCallback(_break_server_2)
1738         d.addCallback(lambda ign:
1739             self._add_server(server_number=3, readonly=True))
1740         d.addCallback(lambda ign:
1741             self._add_server(server_number=4, readonly=True))
1742         d.addCallback(lambda ign:
1743             self._add_server(server_number=5, readonly=True))
1744         d.addCallback(lambda ign:
1745             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1746         d.addCallback(_reset_encoding_parameters)
1747         d.addCallback(lambda client:
1748             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1749                             "placed 0 shares out of 10 "
1750                             "total (10 homeless), want to place shares on at "
1751                             "least 4 servers such that any 3 of them have "
1752                             "enough shares to recover the file, "
1753                             "sent 5 queries to 5 servers, 0 queries placed "
1754                             "some shares, 5 placed none "
1755                             "(of which 4 placed none due to the server being "
1756                             "full and 1 placed none due to an error)",
1757                             client.upload,
1758                             upload.Data("data" * 10000, convergence="")))
1759         # server 0, server 1 = empty, accepting shares
1760         # This should place all of the shares, but still fail with happy=4.
1761         # We want to make sure that the exception message is worded correctly.
1762         d.addCallback(_reset)
1763         d.addCallback(lambda ign:
1764             self._setup_grid())
1765         d.addCallback(lambda ign:
1766             self._add_server(server_number=1))
1767         d.addCallback(_reset_encoding_parameters)
1768         d.addCallback(lambda client:
1769             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1770                             "shares could be placed or found on only 2 "
1771                             "server(s). We were asked to place shares on at "
1772                             "least 4 server(s) such that any 3 of them have "
1773                             "enough shares to recover the file.",
1774                             client.upload, upload.Data("data" * 10000,
1775                                                        convergence="")))
1776         # servers 0 - 4 = empty, accepting shares
1777         # This too should place all the shares, and this too should fail,
1778         # but since the effective happiness is more than the k encoding
1779         # parameter, it should trigger a different error message than the one
1780         # above.
1781         d.addCallback(_reset)
1782         d.addCallback(lambda ign:
1783             self._setup_grid())
1784         d.addCallback(lambda ign:
1785             self._add_server(server_number=1))
1786         d.addCallback(lambda ign:
1787             self._add_server(server_number=2))
1788         d.addCallback(lambda ign:
1789             self._add_server(server_number=3))
1790         d.addCallback(lambda ign:
1791             self._add_server(server_number=4))
1792         d.addCallback(_reset_encoding_parameters, happy=7)
1793         d.addCallback(lambda client:
1794             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1795                             "shares could be placed on only 5 server(s) such "
1796                             "that any 3 of them have enough shares to recover "
1797                             "the file, but we were asked to place shares on "
1798                             "at least 7 such servers.",
1799                             client.upload, upload.Data("data" * 10000,
1800                                                        convergence="")))
1801         # server 0: shares 0 - 9
1802         # server 1: share 0, read-only
1803         # server 2: share 0, read-only
1804         # server 3: share 0, read-only
1805         # This should place all of the shares, but fail with happy=4.
1806         # Since the number of servers with shares is more than the number
1807         # necessary to reconstitute the file, this will trigger a different
1808         # error message than either of those above.
1809         d.addCallback(_reset)
1810         d.addCallback(lambda ign:
1811             self._setup_and_upload())
1812         d.addCallback(lambda ign:
1813             self._add_server_with_share(server_number=1, share_number=0,
1814                                         readonly=True))
1815         d.addCallback(lambda ign:
1816             self._add_server_with_share(server_number=2, share_number=0,
1817                                         readonly=True))
1818         d.addCallback(lambda ign:
1819             self._add_server_with_share(server_number=3, share_number=0,
1820                                         readonly=True))
1821         d.addCallback(_reset_encoding_parameters, happy=7)
1822         d.addCallback(lambda client:
1823             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1824                             "shares could be placed or found on 4 server(s), "
1825                             "but they are not spread out evenly enough to "
1826                             "ensure that any 3 of these servers would have "
1827                             "enough shares to recover the file. We were asked "
1828                             "to place shares on at least 7 servers such that "
1829                             "any 3 of them have enough shares to recover the "
1830                             "file",
1831                             client.upload, upload.Data("data" * 10000,
1832                                                        convergence="")))
1833         return d
1834
1835
1836     def test_problem_layout_comment_187(self):
1837         # #778 comment 187 broke an initial attempt at a share
1838         # redistribution algorithm. This test is here to demonstrate the
1839         # breakage, and to test that subsequent algorithms don't also
1840         # break in the same way.
1841         self.basedir = self.mktemp()
1842         d = self._setup_and_upload(k=2, n=3)
1843
1844         # server 1: shares 0, 1, 2, readonly
1845         # server 2: share 0, readonly
1846         # server 3: share 0
1847         def _setup(ign):
1848             self._add_server_with_share(server_number=1, share_number=0,
1849                                         readonly=True)
1850             self._add_server_with_share(server_number=2, share_number=0,
1851                                         readonly=True)
1852             self._add_server_with_share(server_number=3, share_number=0)
1853             # Copy shares
1854             self._copy_share_to_server(1, 1)
1855             self._copy_share_to_server(2, 1)
1856             # Remove server 0
1857             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1858             client = self.g.clients[0]
1859             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
1860             return client
1861
1862         d.addCallback(_setup)
1863         d.addCallback(lambda client:
1864             client.upload(upload.Data("data" * 10000, convergence="")))
1865         d.addCallback(lambda ign:
1866             self.failUnless(self._has_happy_share_distribution()))
1867         return d
1868     test_problem_layout_comment_187.todo = "this isn't fixed yet"
1869
1870     def test_problem_layout_ticket_1118(self):
1871         # #1118 includes a report from a user who hit an assertion in
1872         # the upload code with this layout.
1873         self.basedir = self.mktemp()
1874         d = self._setup_and_upload(k=2, n=4)
1875
1876         # server 0: no shares
1877         # server 1: shares 0, 3
1878         # server 3: share 1
1879         # server 2: share 2
1880         # The order that they get queries is 0, 1, 3, 2
1881         def _setup(ign):
1882             self._add_server(server_number=0)
1883             self._add_server_with_share(server_number=1, share_number=0)
1884             self._add_server_with_share(server_number=2, share_number=2)
1885             self._add_server_with_share(server_number=3, share_number=1)
1886             # Copy shares
1887             self._copy_share_to_server(3, 1)
1888             storedir = self.get_serverdir(0)
1889             # remove the storedir, wiping out any existing shares
1890             shutil.rmtree(storedir)
1891             # create an empty storedir to replace the one we just removed
1892             os.mkdir(storedir)
1893             client = self.g.clients[0]
1894             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1895             return client
1896
1897         d.addCallback(_setup)
1898         # Note: actually it should succeed! See
1899         # test_problem_layout_ticket_1128. But ticket 1118 is just to
1900         # make it realize that it has failed, so if it raises
1901         # UploadUnhappinessError then we'll give it the green light
1902         # for now.
1903         d.addCallback(lambda ignored:
1904             self.shouldFail(UploadUnhappinessError,
1905                             "test_problem_layout_ticket_1118",
1906                             "",
1907                             self.g.clients[0].upload, upload.Data("data" * 10000,
1908                                                        convergence="")))
1909         return d
1910
1911     def test_problem_layout_ticket_1128(self):
1912         # #1118 includes a report from a user who hit an assertion in
1913         # the upload code with this layout.
1914         self.basedir = self.mktemp()
1915         d = self._setup_and_upload(k=2, n=4)
1916
1917         # server 0: no shares
1918         # server 1: shares 0, 3
1919         # server 3: share 1
1920         # server 2: share 2
1921         # The order that they get queries is 0, 1, 3, 2
1922         def _setup(ign):
1923             self._add_server(server_number=0)
1924             self._add_server_with_share(server_number=1, share_number=0)
1925             self._add_server_with_share(server_number=2, share_number=2)
1926             self._add_server_with_share(server_number=3, share_number=1)
1927             # Copy shares
1928             self._copy_share_to_server(3, 1)
1929             storedir = self.get_serverdir(0)
1930             # remove the storedir, wiping out any existing shares
1931             shutil.rmtree(storedir)
1932             # create an empty storedir to replace the one we just removed
1933             os.mkdir(storedir)
1934             client = self.g.clients[0]
1935             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1936             return client
1937
1938         d.addCallback(_setup)
1939         d.addCallback(lambda client:
1940                           client.upload(upload.Data("data" * 10000, convergence="")))
1941         d.addCallback(lambda ign:
1942             self.failUnless(self._has_happy_share_distribution()))
1943         return d
1944     test_problem_layout_ticket_1128.todo = "Invent a smarter uploader that uploads successfully in this case."
1945
1946     def test_upload_succeeds_with_some_homeless_shares(self):
1947         # If the upload is forced to stop trying to place shares before
1948         # it has placed (or otherwise accounted) for all of them, but it
1949         # has placed enough to satisfy the upload health criteria that
1950         # we're using, it should still succeed.
1951         self.basedir = self.mktemp()
1952         d = self._setup_and_upload()
1953         def _server_setup(ign):
1954             # Add four servers so that we have a layout like this:
1955             # server 1: share 0, read-only
1956             # server 2: share 1, read-only
1957             # server 3: share 2, read-only
1958             # server 4: share 3, read-only
1959             # If we set happy = 4, the upload will manage to satisfy
1960             # servers of happiness, but not place all of the shares; we
1961             # want to test that the upload is declared successful in
1962             # this case.
1963             self._add_server_with_share(server_number=1, share_number=0,
1964                                         readonly=True)
1965             self._add_server_with_share(server_number=2, share_number=1,
1966                                         readonly=True)
1967             self._add_server_with_share(server_number=3, share_number=2,
1968                                         readonly=True)
1969             self._add_server_with_share(server_number=4, share_number=3,
1970                                         readonly=True)
1971             # Remove server 0.
1972             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1973             # Set the client appropriately
1974             c = self.g.clients[0]
1975             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1976             return c
1977         d.addCallback(_server_setup)
1978         d.addCallback(lambda client:
1979             client.upload(upload.Data("data" * 10000, convergence="")))
1980         d.addCallback(lambda ign:
1981             self.failUnless(self._has_happy_share_distribution()))
1982         return d
1983
1984
1985     def test_uploader_skips_over_servers_with_only_one_share(self):
1986         # We want to make sure that the redistribution logic ignores
1987         # servers with only one share, since placing these shares
1988         # elsewhere will at best keep happiness the same as it was, and
1989         # at worst hurt it.
1990         self.basedir = self.mktemp()
1991         d = self._setup_and_upload()
1992         def _server_setup(ign):
1993             # Add some servers so that the upload will need to
1994             # redistribute, but will first pass over a couple of servers
1995             # that don't have enough shares to redistribute before
1996             # finding one that does have shares to redistribute.
1997             self._add_server_with_share(server_number=1, share_number=0)
1998             self._add_server_with_share(server_number=2, share_number=2)
1999             self._add_server_with_share(server_number=3, share_number=1)
2000             self._add_server_with_share(server_number=8, share_number=4)
2001             self._add_server_with_share(server_number=5, share_number=5)
2002             self._add_server_with_share(server_number=10, share_number=7)
2003             for i in xrange(4):
2004                 self._copy_share_to_server(i, 2)
2005             return self.g.clients[0]
2006         d.addCallback(_server_setup)
2007         d.addCallback(lambda client:
2008             client.upload(upload.Data("data" * 10000, convergence="")))
2009         d.addCallback(lambda ign:
2010             self.failUnless(self._has_happy_share_distribution()))
2011         return d
2012
2013
2014     def test_server_selector_bucket_abort(self):
2015         # If server selection for an upload fails due to an unhappy
2016         # layout, the server selection process should abort the buckets it
2017         # allocates before failing, so that the space can be re-used.
2018         self.basedir = self.mktemp()
2019         self.set_up_grid(num_servers=5)
2020
2021         # Try to upload a file with happy=7, which is unsatisfiable with
2022         # the current grid. This will fail, but should not take up any
2023         # space on the storage servers after it fails.
2024         client = self.g.clients[0]
2025         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2026         d = defer.succeed(None)
2027         d.addCallback(lambda ignored:
2028             self.shouldFail(UploadUnhappinessError,
2029                             "test_server_selection_bucket_abort",
2030                             "",
2031                             client.upload, upload.Data("data" * 10000,
2032                                                        convergence="")))
2033         # wait for the abort messages to get there.
2034         def _turn_barrier(res):
2035             return fireEventually(res)
2036         d.addCallback(_turn_barrier)
2037         def _then(ignored):
2038             for server in self.g.servers_by_number.values():
2039                 self.failUnlessEqual(server.allocated_size(), 0)
2040         d.addCallback(_then)
2041         return d
2042
2043
2044     def test_encoder_bucket_abort(self):
2045         # If enough servers die in the process of encoding and uploading
2046         # a file to make the layout unhappy, we should cancel the
2047         # newly-allocated buckets before dying.
2048         self.basedir = self.mktemp()
2049         self.set_up_grid(num_servers=4)
2050
2051         client = self.g.clients[0]
2052         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
2053
2054         d = defer.succeed(None)
2055         d.addCallback(lambda ignored:
2056             self.shouldFail(UploadUnhappinessError,
2057                             "test_encoder_bucket_abort",
2058                             "",
2059                             self._do_upload_with_broken_servers, 1))
2060         def _turn_barrier(res):
2061             return fireEventually(res)
2062         d.addCallback(_turn_barrier)
2063         def _then(ignored):
2064             for server in self.g.servers_by_number.values():
2065                 self.failUnlessEqual(server.allocated_size(), 0)
2066         d.addCallback(_then)
2067         return d
2068
2069
2070     def _set_up_nodes_extra_config(self, clientdir):
2071         cfgfn = os.path.join(clientdir, "tahoe.cfg")
2072         oldcfg = open(cfgfn, "r").read()
2073         f = open(cfgfn, "wt")
2074         f.write(oldcfg)
2075         f.write("\n")
2076         f.write("[client]\n")
2077         f.write("shares.needed = 7\n")
2078         f.write("shares.total = 12\n")
2079         f.write("\n")
2080         f.close()
2081         return None
2082
2083 # TODO:
2084 #  upload with exactly 75 servers (shares_of_happiness)
2085 #  have a download fail
2086 #  cancel a download (need to implement more cancel stuff)
2087
2088 # from test_encode:
2089 # NoNetworkGrid, upload part of ciphertext, kill server, continue upload
2090 # check with Kevan, they want to live in test_upload, existing tests might cover
2091 #     def test_lost_one_shareholder(self): # these are upload-side tests
2092 #     def test_lost_one_shareholder_early(self):
2093 #     def test_lost_many_shareholders(self):
2094 #     def test_lost_all_shareholders(self):