]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
Teach StorageFarmBroker to fire a deferred when a connection threshold is reached...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1 # -*- coding: utf-8 -*-
2
3 import os, shutil
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.python.failure import Failure
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
9
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
14 from allmydata.util import log, base32
15 from allmydata.util.assertutil import precondition
16 from allmydata.util.deferredutil import DeferredListShouldSucceed
17 from allmydata.test.no_network import GridTestMixin
18 from allmydata.test.common_util import ShouldFailMixin
19 from allmydata.util.happinessutil import servers_of_happiness, \
20                                          shares_by_server, merge_servers
21 from allmydata.storage_client import StorageFarmBroker
22 from allmydata.storage.server import storage_index_to_dir
23 from allmydata.client import Client
24
25 MiB = 1024*1024
26
27 def extract_uri(results):
28     return results.get_uri()
29
30 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
31 # have been due to an earlier test ERROR'ing out due to timeout, which seems
32 # to screw up subsequent tests.
33 timeout = 960
34
35 class Uploadable(unittest.TestCase):
36     def shouldEqual(self, data, expected):
37         self.failUnless(isinstance(data, list))
38         for e in data:
39             self.failUnless(isinstance(e, str))
40         s = "".join(data)
41         self.failUnlessEqual(s, expected)
42
43     def test_filehandle_random_key(self):
44         return self._test_filehandle(convergence=None)
45
46     def test_filehandle_convergent_encryption(self):
47         return self._test_filehandle(convergence="some convergence string")
48
49     def _test_filehandle(self, convergence):
50         s = StringIO("a"*41)
51         u = upload.FileHandle(s, convergence=convergence)
52         d = u.get_size()
53         d.addCallback(self.failUnlessEqual, 41)
54         d.addCallback(lambda res: u.read(1))
55         d.addCallback(self.shouldEqual, "a")
56         d.addCallback(lambda res: u.read(80))
57         d.addCallback(self.shouldEqual, "a"*40)
58         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
59         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
60         return d
61
62     def test_filename(self):
63         basedir = "upload/Uploadable/test_filename"
64         os.makedirs(basedir)
65         fn = os.path.join(basedir, "file")
66         f = open(fn, "w")
67         f.write("a"*41)
68         f.close()
69         u = upload.FileName(fn, convergence=None)
70         d = u.get_size()
71         d.addCallback(self.failUnlessEqual, 41)
72         d.addCallback(lambda res: u.read(1))
73         d.addCallback(self.shouldEqual, "a")
74         d.addCallback(lambda res: u.read(80))
75         d.addCallback(self.shouldEqual, "a"*40)
76         d.addCallback(lambda res: u.close())
77         return d
78
79     def test_data(self):
80         s = "a"*41
81         u = upload.Data(s, convergence=None)
82         d = u.get_size()
83         d.addCallback(self.failUnlessEqual, 41)
84         d.addCallback(lambda res: u.read(1))
85         d.addCallback(self.shouldEqual, "a")
86         d.addCallback(lambda res: u.read(80))
87         d.addCallback(self.shouldEqual, "a"*40)
88         d.addCallback(lambda res: u.close())
89         return d
90
91 class ServerError(Exception):
92     pass
93
94 class SetDEPMixin:
95     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
96         p = {"k": k,
97              "happy": happy,
98              "n": n,
99              "max_segment_size": max_segsize,
100              }
101         self.node.encoding_params = p
102
103 class FakeStorageServer:
104     def __init__(self, mode):
105         self.mode = mode
106         self.allocated = []
107         self.queries = 0
108         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
109                          { "maximum-immutable-share-size": 2**32 - 1 },
110                          "application-version": str(allmydata.__full_version__),
111                          }
112         if mode == "small":
113             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
114                              { "maximum-immutable-share-size": 10 },
115                              "application-version": str(allmydata.__full_version__),
116                              }
117
118
119     def callRemote(self, methname, *args, **kwargs):
120         def _call():
121             meth = getattr(self, methname)
122             return meth(*args, **kwargs)
123         d = fireEventually()
124         d.addCallback(lambda res: _call())
125         return d
126
127     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
128                          sharenums, share_size, canary):
129         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
130         if self.mode == "first-fail":
131             if self.queries == 0:
132                 raise ServerError
133         if self.mode == "second-fail":
134             if self.queries == 1:
135                 raise ServerError
136         self.queries += 1
137         if self.mode == "full":
138             return (set(), {},)
139         elif self.mode == "already got them":
140             return (set(sharenums), {},)
141         else:
142             for shnum in sharenums:
143                 self.allocated.append( (storage_index, shnum) )
144             return (set(),
145                     dict([( shnum, FakeBucketWriter(share_size) )
146                           for shnum in sharenums]),
147                     )
148
149 class FakeBucketWriter:
150     # a diagnostic version of storageserver.BucketWriter
151     def __init__(self, size):
152         self.data = StringIO()
153         self.closed = False
154         self._size = size
155
156     def callRemote(self, methname, *args, **kwargs):
157         def _call():
158             meth = getattr(self, "remote_" + methname)
159             return meth(*args, **kwargs)
160         d = fireEventually()
161         d.addCallback(lambda res: _call())
162         return d
163
164
165     def callRemoteOnly(self, methname, *args, **kwargs):
166         d = self.callRemote(methname, *args, **kwargs)
167         del d # callRemoteOnly ignores this
168         return None
169
170
171     def remote_write(self, offset, data):
172         precondition(not self.closed)
173         precondition(offset >= 0)
174         precondition(offset+len(data) <= self._size,
175                      "offset=%d + data=%d > size=%d" %
176                      (offset, len(data), self._size))
177         self.data.seek(offset)
178         self.data.write(data)
179
180     def remote_close(self):
181         precondition(not self.closed)
182         self.closed = True
183
184     def remote_abort(self):
185         pass
186
187 class FakeClient:
188     DEFAULT_ENCODING_PARAMETERS = {"k":25,
189                                    "happy": 25,
190                                    "n": 100,
191                                    "max_segment_size": 1*MiB,
192                                    }
193
194     def __init__(self, mode="good", num_servers=50):
195         self.num_servers = num_servers
196         self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
197         if type(mode) is str:
198             mode = dict([i,mode] for i in range(num_servers))
199         servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
200                     for fakeid in range(self.num_servers) ]
201         self.storage_broker = StorageFarmBroker(None, True, 0, None)
202         for (serverid, rref) in servers:
203             ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
204                    "permutation-seed-base32": base32.b2a(serverid) }
205             self.storage_broker.test_add_rref(serverid, rref, ann)
206         self.last_servers = [s[1] for s in servers]
207
208     def log(self, *args, **kwargs):
209         pass
210     def get_encoding_parameters(self):
211         return self.encoding_params
212     def get_storage_broker(self):
213         return self.storage_broker
214     _secret_holder = client.SecretHolder("lease secret", "convergence secret")
215
216 class GotTooFarError(Exception):
217     pass
218
219 class GiganticUploadable(upload.FileHandle):
220     def __init__(self, size):
221         self._size = size
222         self._fp = 0
223
224     def get_encryption_key(self):
225         return defer.succeed("\x00" * 16)
226     def get_size(self):
227         return defer.succeed(self._size)
228     def read(self, length):
229         left = self._size - self._fp
230         length = min(left, length)
231         self._fp += length
232         if self._fp > 1000000:
233             # terminate the test early.
234             raise GotTooFarError("we shouldn't be allowed to get this far")
235         return defer.succeed(["\x00" * length])
236     def close(self):
237         pass
238
239 DATA = """
240 Once upon a time, there was a beautiful princess named Buttercup. She lived
241 in a magical land where every file was stored securely among millions of
242 machines, and nobody ever worried about their data being lost ever again.
243 The End.
244 """
245 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
246
247 SIZE_ZERO = 0
248 SIZE_SMALL = 16
249 SIZE_LARGE = len(DATA)
250
251 def upload_data(uploader, data):
252     u = upload.Data(data, convergence=None)
253     return uploader.upload(u)
254 def upload_filename(uploader, filename):
255     u = upload.FileName(filename, convergence=None)
256     return uploader.upload(u)
257 def upload_filehandle(uploader, fh):
258     u = upload.FileHandle(fh, convergence=None)
259     return uploader.upload(u)
260
261 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
262     def setUp(self):
263         self.node = FakeClient(mode="good")
264         self.u = upload.Uploader()
265         self.u.running = True
266         self.u.parent = self.node
267
268     def _check_small(self, newuri, size):
269         u = uri.from_string(newuri)
270         self.failUnless(isinstance(u, uri.LiteralFileURI))
271         self.failUnlessEqual(len(u.data), size)
272
273     def _check_large(self, newuri, size):
274         u = uri.from_string(newuri)
275         self.failUnless(isinstance(u, uri.CHKFileURI))
276         self.failUnless(isinstance(u.get_storage_index(), str))
277         self.failUnlessEqual(len(u.get_storage_index()), 16)
278         self.failUnless(isinstance(u.key, str))
279         self.failUnlessEqual(len(u.key), 16)
280         self.failUnlessEqual(u.size, size)
281
282     def get_data(self, size):
283         return DATA[:size]
284
285     def test_too_large(self):
286         # we've removed the 4GiB share size limit (see ticket #346 for
287         # details), but still have an 8-byte field, so the limit is now
288         # 2**64, so make sure we reject files larger than that.
289         k = 3; happy = 7; n = 10
290         self.set_encoding_parameters(k, happy, n)
291         big = k*(2**64)
292         data1 = GiganticUploadable(big)
293         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
294                             "This file is too large to be uploaded (data_size)",
295                             self.u.upload, data1)
296         data2 = GiganticUploadable(big-3)
297         d.addCallback(lambda res:
298                       self.shouldFail(FileTooLargeError,
299                                       "test_too_large-data2",
300                                       "This file is too large to be uploaded (offsets)",
301                                       self.u.upload, data2))
302         # I don't know where the actual limit is.. it depends upon how large
303         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
304         return d
305
306     def test_data_zero(self):
307         data = self.get_data(SIZE_ZERO)
308         d = upload_data(self.u, data)
309         d.addCallback(extract_uri)
310         d.addCallback(self._check_small, SIZE_ZERO)
311         return d
312
313     def test_data_small(self):
314         data = self.get_data(SIZE_SMALL)
315         d = upload_data(self.u, data)
316         d.addCallback(extract_uri)
317         d.addCallback(self._check_small, SIZE_SMALL)
318         return d
319
320     def test_data_large(self):
321         data = self.get_data(SIZE_LARGE)
322         d = upload_data(self.u, data)
323         d.addCallback(extract_uri)
324         d.addCallback(self._check_large, SIZE_LARGE)
325         return d
326
327     def test_data_large_odd_segments(self):
328         data = self.get_data(SIZE_LARGE)
329         segsize = int(SIZE_LARGE / 2.5)
330         # we want 3 segments, since that's not a power of two
331         self.set_encoding_parameters(25, 25, 100, segsize)
332         d = upload_data(self.u, data)
333         d.addCallback(extract_uri)
334         d.addCallback(self._check_large, SIZE_LARGE)
335         return d
336
337     def test_filehandle_zero(self):
338         data = self.get_data(SIZE_ZERO)
339         d = upload_filehandle(self.u, StringIO(data))
340         d.addCallback(extract_uri)
341         d.addCallback(self._check_small, SIZE_ZERO)
342         return d
343
344     def test_filehandle_small(self):
345         data = self.get_data(SIZE_SMALL)
346         d = upload_filehandle(self.u, StringIO(data))
347         d.addCallback(extract_uri)
348         d.addCallback(self._check_small, SIZE_SMALL)
349         return d
350
351     def test_filehandle_large(self):
352         data = self.get_data(SIZE_LARGE)
353         d = upload_filehandle(self.u, StringIO(data))
354         d.addCallback(extract_uri)
355         d.addCallback(self._check_large, SIZE_LARGE)
356         return d
357
358     def test_filename_zero(self):
359         fn = "Uploader-test_filename_zero.data"
360         f = open(fn, "wb")
361         data = self.get_data(SIZE_ZERO)
362         f.write(data)
363         f.close()
364         d = upload_filename(self.u, fn)
365         d.addCallback(extract_uri)
366         d.addCallback(self._check_small, SIZE_ZERO)
367         return d
368
369     def test_filename_small(self):
370         fn = "Uploader-test_filename_small.data"
371         f = open(fn, "wb")
372         data = self.get_data(SIZE_SMALL)
373         f.write(data)
374         f.close()
375         d = upload_filename(self.u, fn)
376         d.addCallback(extract_uri)
377         d.addCallback(self._check_small, SIZE_SMALL)
378         return d
379
380     def test_filename_large(self):
381         fn = "Uploader-test_filename_large.data"
382         f = open(fn, "wb")
383         data = self.get_data(SIZE_LARGE)
384         f.write(data)
385         f.close()
386         d = upload_filename(self.u, fn)
387         d.addCallback(extract_uri)
388         d.addCallback(self._check_large, SIZE_LARGE)
389         return d
390
391 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
392     def make_node(self, mode, num_servers=10):
393         self.node = FakeClient(mode, num_servers)
394         self.u = upload.Uploader()
395         self.u.running = True
396         self.u.parent = self.node
397
398     def _check_large(self, newuri, size):
399         u = uri.from_string(newuri)
400         self.failUnless(isinstance(u, uri.CHKFileURI))
401         self.failUnless(isinstance(u.get_storage_index(), str))
402         self.failUnlessEqual(len(u.get_storage_index()), 16)
403         self.failUnless(isinstance(u.key, str))
404         self.failUnlessEqual(len(u.key), 16)
405         self.failUnlessEqual(u.size, size)
406
407     def test_first_error(self):
408         mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
409         self.make_node(mode)
410         self.set_encoding_parameters(k=25, happy=1, n=50)
411         d = upload_data(self.u, DATA)
412         d.addCallback(extract_uri)
413         d.addCallback(self._check_large, SIZE_LARGE)
414         return d
415
416     def test_first_error_all(self):
417         self.make_node("first-fail")
418         d = self.shouldFail(UploadUnhappinessError, "first_error_all",
419                             "server selection failed",
420                             upload_data, self.u, DATA)
421         def _check((f,)):
422             self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
423             # there should also be a 'last failure was' message
424             self.failUnlessIn("ServerError", str(f.value))
425         d.addCallback(_check)
426         return d
427
428     def test_second_error(self):
429         # we want to make sure we make it to a third pass. This means that
430         # the first pass was insufficient to place all shares, and at least
431         # one of second pass servers (other than the last one) accepted a
432         # share (so we'll believe that a third pass will be useful). (if
433         # everyone but the last server throws an error, then we'll send all
434         # the remaining shares to the last server at the end of the second
435         # pass, and if that succeeds, we won't make it to a third pass).
436         #
437         # we can achieve this 97.5% of the time by using 40 servers, having
438         # 39 of them fail on the second request, leaving only one to succeed
439         # on the second request. (we need to keep the number of servers low
440         # enough to ensure a second pass with 100 shares).
441         mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
442         self.make_node(mode, 40)
443         d = upload_data(self.u, DATA)
444         d.addCallback(extract_uri)
445         d.addCallback(self._check_large, SIZE_LARGE)
446         return d
447
448     def test_second_error_all(self):
449         self.make_node("second-fail")
450         d = self.shouldFail(UploadUnhappinessError, "second_error_all",
451                             "server selection failed",
452                             upload_data, self.u, DATA)
453         def _check((f,)):
454             self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
455             # there should also be a 'last failure was' message
456             self.failUnlessIn("ServerError", str(f.value))
457         d.addCallback(_check)
458         return d
459
460 class FullServer(unittest.TestCase):
461     def setUp(self):
462         self.node = FakeClient(mode="full")
463         self.u = upload.Uploader()
464         self.u.running = True
465         self.u.parent = self.node
466
467     def _should_fail(self, f):
468         self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
469
470     def test_data_large(self):
471         data = DATA
472         d = upload_data(self.u, data)
473         d.addBoth(self._should_fail)
474         return d
475
476 class ServerSelection(unittest.TestCase):
477
478     def make_client(self, num_servers=50):
479         self.node = FakeClient(mode="good", num_servers=num_servers)
480         self.u = upload.Uploader()
481         self.u.running = True
482         self.u.parent = self.node
483
484     def get_data(self, size):
485         return DATA[:size]
486
487     def _check_large(self, newuri, size):
488         u = uri.from_string(newuri)
489         self.failUnless(isinstance(u, uri.CHKFileURI))
490         self.failUnless(isinstance(u.get_storage_index(), str))
491         self.failUnlessEqual(len(u.get_storage_index()), 16)
492         self.failUnless(isinstance(u.key, str))
493         self.failUnlessEqual(len(u.key), 16)
494         self.failUnlessEqual(u.size, size)
495
496     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
497         p = {"k": k,
498              "happy": happy,
499              "n": n,
500              "max_segment_size": max_segsize,
501              }
502         self.node.encoding_params = p
503
504     def test_one_each(self):
505         # if we have 50 shares, and there are 50 servers, and they all accept
506         # a share, we should get exactly one share per server
507
508         self.make_client()
509         data = self.get_data(SIZE_LARGE)
510         self.set_encoding_parameters(25, 30, 50)
511         d = upload_data(self.u, data)
512         d.addCallback(extract_uri)
513         d.addCallback(self._check_large, SIZE_LARGE)
514         def _check(res):
515             for s in self.node.last_servers:
516                 allocated = s.allocated
517                 self.failUnlessEqual(len(allocated), 1)
518                 self.failUnlessEqual(s.queries, 1)
519         d.addCallback(_check)
520         return d
521
522     def test_two_each(self):
523         # if we have 100 shares, and there are 50 servers, and they all
524         # accept all shares, we should get exactly two shares per server
525
526         self.make_client()
527         data = self.get_data(SIZE_LARGE)
528         # if there are 50 servers, then happy needs to be <= 50
529         self.set_encoding_parameters(50, 50, 100)
530         d = upload_data(self.u, data)
531         d.addCallback(extract_uri)
532         d.addCallback(self._check_large, SIZE_LARGE)
533         def _check(res):
534             for s in self.node.last_servers:
535                 allocated = s.allocated
536                 self.failUnlessEqual(len(allocated), 2)
537                 self.failUnlessEqual(s.queries, 2)
538         d.addCallback(_check)
539         return d
540
541     def test_one_each_plus_one_extra(self):
542         # if we have 51 shares, and there are 50 servers, then one server
543         # gets two shares and the rest get just one
544
545         self.make_client()
546         data = self.get_data(SIZE_LARGE)
547         self.set_encoding_parameters(24, 41, 51)
548         d = upload_data(self.u, data)
549         d.addCallback(extract_uri)
550         d.addCallback(self._check_large, SIZE_LARGE)
551         def _check(res):
552             got_one = []
553             got_two = []
554             for s in self.node.last_servers:
555                 allocated = s.allocated
556                 self.failUnless(len(allocated) in (1,2), len(allocated))
557                 if len(allocated) == 1:
558                     self.failUnlessEqual(s.queries, 1)
559                     got_one.append(s)
560                 else:
561                     self.failUnlessEqual(s.queries, 2)
562                     got_two.append(s)
563             self.failUnlessEqual(len(got_one), 49)
564             self.failUnlessEqual(len(got_two), 1)
565         d.addCallback(_check)
566         return d
567
568     def test_four_each(self):
569         # if we have 200 shares, and there are 50 servers, then each server
570         # gets 4 shares. The design goal is to accomplish this with only two
571         # queries per server.
572
573         self.make_client()
574         data = self.get_data(SIZE_LARGE)
575         # if there are 50 servers, then happy should be no more than 50 if we
576         # want this to work.
577         self.set_encoding_parameters(100, 50, 200)
578         d = upload_data(self.u, data)
579         d.addCallback(extract_uri)
580         d.addCallback(self._check_large, SIZE_LARGE)
581         def _check(res):
582             for s in self.node.last_servers:
583                 allocated = s.allocated
584                 self.failUnlessEqual(len(allocated), 4)
585                 self.failUnlessEqual(s.queries, 2)
586         d.addCallback(_check)
587         return d
588
589     def test_three_of_ten(self):
590         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
591         # 4+4+2
592
593         self.make_client(3)
594         data = self.get_data(SIZE_LARGE)
595         self.set_encoding_parameters(3, 3, 10)
596         d = upload_data(self.u, data)
597         d.addCallback(extract_uri)
598         d.addCallback(self._check_large, SIZE_LARGE)
599         def _check(res):
600             counts = {}
601             for s in self.node.last_servers:
602                 allocated = s.allocated
603                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
604             histogram = [counts.get(i, 0) for i in range(5)]
605             self.failUnlessEqual(histogram, [0,0,0,2,1])
606         d.addCallback(_check)
607         return d
608
609     def test_some_big_some_small(self):
610         # 10 shares, 20 servers, but half the servers don't support a
611         # share-size large enough for our file
612         mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
613         self.node = FakeClient(mode, num_servers=20)
614         self.u = upload.Uploader()
615         self.u.running = True
616         self.u.parent = self.node
617
618         data = self.get_data(SIZE_LARGE)
619         self.set_encoding_parameters(3, 5, 10)
620         d = upload_data(self.u, data)
621         d.addCallback(extract_uri)
622         d.addCallback(self._check_large, SIZE_LARGE)
623         def _check(res):
624             # we should have put one share each on the big servers, and zero
625             # shares on the small servers
626             total_allocated = 0
627             for p in self.node.last_servers:
628                 if p.mode == "good":
629                     self.failUnlessEqual(len(p.allocated), 1)
630                 elif p.mode == "small":
631                     self.failUnlessEqual(len(p.allocated), 0)
632                 total_allocated += len(p.allocated)
633             self.failUnlessEqual(total_allocated, 10)
634         d.addCallback(_check)
635         return d
636
637
638 class StorageIndex(unittest.TestCase):
639     def test_params_must_matter(self):
640         DATA = "I am some data"
641         PARAMS = Client.DEFAULT_ENCODING_PARAMETERS
642
643         u = upload.Data(DATA, convergence="")
644         u.set_default_encoding_parameters(PARAMS)
645         eu = upload.EncryptAnUploadable(u)
646         d1 = eu.get_storage_index()
647
648         # CHK means the same data should encrypt the same way
649         u = upload.Data(DATA, convergence="")
650         u.set_default_encoding_parameters(PARAMS)
651         eu = upload.EncryptAnUploadable(u)
652         d1a = eu.get_storage_index()
653
654         # but if we use a different convergence string it should be different
655         u = upload.Data(DATA, convergence="wheee!")
656         u.set_default_encoding_parameters(PARAMS)
657         eu = upload.EncryptAnUploadable(u)
658         d1salt1 = eu.get_storage_index()
659
660         # and if we add yet a different convergence it should be different again
661         u = upload.Data(DATA, convergence="NOT wheee!")
662         u.set_default_encoding_parameters(PARAMS)
663         eu = upload.EncryptAnUploadable(u)
664         d1salt2 = eu.get_storage_index()
665
666         # and if we use the first string again it should be the same as last time
667         u = upload.Data(DATA, convergence="wheee!")
668         u.set_default_encoding_parameters(PARAMS)
669         eu = upload.EncryptAnUploadable(u)
670         d1salt1a = eu.get_storage_index()
671
672         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
673         u = upload.Data(DATA, convergence="")
674         u.set_default_encoding_parameters(PARAMS)
675         u.encoding_param_k = u.default_encoding_param_k + 1
676         eu = upload.EncryptAnUploadable(u)
677         d2 = eu.get_storage_index()
678
679         # and if we use a random key, it should be different than the CHK
680         u = upload.Data(DATA, convergence=None)
681         u.set_default_encoding_parameters(PARAMS)
682         eu = upload.EncryptAnUploadable(u)
683         d3 = eu.get_storage_index()
684         # and different from another instance
685         u = upload.Data(DATA, convergence=None)
686         u.set_default_encoding_parameters(PARAMS)
687         eu = upload.EncryptAnUploadable(u)
688         d4 = eu.get_storage_index()
689
690         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
691         def _done(res):
692             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
693             self.failUnlessEqual(si1, si1a)
694             self.failIfEqual(si1, si2)
695             self.failIfEqual(si1, si3)
696             self.failIfEqual(si1, si4)
697             self.failIfEqual(si3, si4)
698             self.failIfEqual(si1salt1, si1)
699             self.failIfEqual(si1salt1, si1salt2)
700             self.failIfEqual(si1salt2, si1)
701             self.failUnlessEqual(si1salt1, si1salt1a)
702         d.addCallback(_done)
703         return d
704
705 # copied from python docs because itertools.combinations was added in
706 # python 2.6 and we support >= 2.4.
707 def combinations(iterable, r):
708     # combinations('ABCD', 2) --> AB AC AD BC BD CD
709     # combinations(range(4), 3) --> 012 013 023 123
710     pool = tuple(iterable)
711     n = len(pool)
712     if r > n:
713         return
714     indices = range(r)
715     yield tuple(pool[i] for i in indices)
716     while True:
717         for i in reversed(range(r)):
718             if indices[i] != i + n - r:
719                 break
720         else:
721             return
722         indices[i] += 1
723         for j in range(i+1, r):
724             indices[j] = indices[j-1] + 1
725         yield tuple(pool[i] for i in indices)
726
727 def is_happy_enough(servertoshnums, h, k):
728     """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """
729     if len(servertoshnums) < h:
730         return False
731     # print "servertoshnums: ", servertoshnums, h, k
732     for happysetcombo in combinations(servertoshnums.iterkeys(), h):
733         # print "happysetcombo: ", happysetcombo
734         for subsetcombo in combinations(happysetcombo, k):
735             shnums = reduce(set.union, [ servertoshnums[s] for s in subsetcombo ])
736             # print "subsetcombo: ", subsetcombo, ", shnums: ", shnums
737             if len(shnums) < k:
738                 # print "NOT HAAPP{Y", shnums, k
739                 return False
740     # print "HAAPP{Y"
741     return True
742
743 class FakeServerTracker:
744     def __init__(self, serverid, buckets):
745         self._serverid = serverid
746         self.buckets = buckets
747     def get_serverid(self):
748         return self._serverid
749
750 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
751     ShouldFailMixin):
752     def find_all_shares(self, unused=None):
753         """Locate shares on disk. Returns a dict that maps
754         server to set of sharenums.
755         """
756         assert self.g, "I tried to find a grid at self.g, but failed"
757         servertoshnums = {} # k: server, v: set(shnum)
758
759         for i, c in self.g.servers_by_number.iteritems():
760             for (dirp, dirns, fns) in os.walk(c.sharedir):
761                 for fn in fns:
762                     try:
763                         sharenum = int(fn)
764                     except TypeError:
765                         # Whoops, I guess that's not a share file then.
766                         pass
767                     else:
768                         servertoshnums.setdefault(i, set()).add(sharenum)
769
770         return servertoshnums
771
772     def _do_upload_with_broken_servers(self, servers_to_break):
773         """
774         I act like a normal upload, but before I send the results of
775         Tahoe2ServerSelector to the Encoder, I break the first
776         servers_to_break ServerTrackers in the upload_servers part of the
777         return result.
778         """
779         assert self.g, "I tried to find a grid at self.g, but failed"
780         broker = self.g.clients[0].storage_broker
781         sh     = self.g.clients[0]._secret_holder
782         data = upload.Data("data" * 10000, convergence="")
783         data.set_default_encoding_parameters({'k': 3, 'happy': 4, 'n': 10})
784         uploadable = upload.EncryptAnUploadable(data)
785         encoder = encode.Encoder()
786         encoder.set_encrypted_uploadable(uploadable)
787         status = upload.UploadStatus()
788         selector = upload.Tahoe2ServerSelector("dglev", "test", status)
789         storage_index = encoder.get_param("storage_index")
790         share_size = encoder.get_param("share_size")
791         block_size = encoder.get_param("block_size")
792         num_segments = encoder.get_param("num_segments")
793         d = selector.get_shareholders(broker, sh, storage_index,
794                                       share_size, block_size, num_segments,
795                                       10, 3, 4)
796         def _have_shareholders((upload_trackers, already_servers)):
797             assert servers_to_break <= len(upload_trackers)
798             for index in xrange(servers_to_break):
799                 tracker = list(upload_trackers)[index]
800                 for share in tracker.buckets.keys():
801                     tracker.buckets[share].abort()
802             buckets = {}
803             servermap = already_servers.copy()
804             for tracker in upload_trackers:
805                 buckets.update(tracker.buckets)
806                 for bucket in tracker.buckets:
807                     servermap.setdefault(bucket, set()).add(tracker.get_serverid())
808             encoder.set_shareholders(buckets, servermap)
809             d = encoder.start()
810             return d
811         d.addCallback(_have_shareholders)
812         return d
813
814     def _has_happy_share_distribution(self):
815         servertoshnums = self.find_all_shares()
816         k = self.g.clients[0].encoding_params['k']
817         h = self.g.clients[0].encoding_params['happy']
818         return is_happy_enough(servertoshnums, h, k)
819
820     def _add_server(self, server_number, readonly=False):
821         assert self.g, "I tried to find a grid at self.g, but failed"
822         ss = self.g.make_server(server_number, readonly)
823         log.msg("just created a server, number: %s => %s" % (server_number, ss,))
824         self.g.add_server(server_number, ss)
825
826     def _add_server_with_share(self, server_number, share_number=None,
827                                readonly=False):
828         self._add_server(server_number, readonly)
829         if share_number is not None:
830             self._copy_share_to_server(share_number, server_number)
831
832
833     def _copy_share_to_server(self, share_number, server_number):
834         ss = self.g.servers_by_number[server_number]
835         # Copy share i from the directory associated with the first
836         # storage server to the directory associated with this one.
837         assert self.g, "I tried to find a grid at self.g, but failed"
838         assert self.shares, "I tried to find shares at self.shares, but failed"
839         old_share_location = self.shares[share_number][2]
840         new_share_location = os.path.join(ss.storedir, "shares")
841         si = uri.from_string(self.uri).get_storage_index()
842         new_share_location = os.path.join(new_share_location,
843                                           storage_index_to_dir(si))
844         if not os.path.exists(new_share_location):
845             os.makedirs(new_share_location)
846         new_share_location = os.path.join(new_share_location,
847                                           str(share_number))
848         if old_share_location != new_share_location:
849             shutil.copy(old_share_location, new_share_location)
850         shares = self.find_uri_shares(self.uri)
851         # Make sure that the storage server has the share.
852         self.failUnless((share_number, ss.my_nodeid, new_share_location)
853                         in shares)
854
855     def _setup_grid(self):
856         """
857         I set up a NoNetworkGrid with a single server and client.
858         """
859         self.set_up_grid(num_clients=1, num_servers=1)
860
861     def _setup_and_upload(self, **kwargs):
862         """
863         I set up a NoNetworkGrid with a single server and client,
864         upload a file to it, store its uri in self.uri, and store its
865         sharedata in self.shares.
866         """
867         self._setup_grid()
868         client = self.g.clients[0]
869         client.encoding_params['happy'] = 1
870         if "n" in kwargs and "k" in kwargs:
871             client.encoding_params['k'] = kwargs['k']
872             client.encoding_params['n'] = kwargs['n']
873         data = upload.Data("data" * 10000, convergence="")
874         self.data = data
875         d = client.upload(data)
876         def _store_uri(ur):
877             self.uri = ur.get_uri()
878         d.addCallback(_store_uri)
879         d.addCallback(lambda ign:
880             self.find_uri_shares(self.uri))
881         def _store_shares(shares):
882             self.shares = shares
883         d.addCallback(_store_shares)
884         return d
885
886     def test_configure_parameters(self):
887         self.basedir = self.mktemp()
888         hooks = {0: self._set_up_nodes_extra_config}
889         self.set_up_grid(client_config_hooks=hooks)
890         c0 = self.g.clients[0]
891
892         DATA = "data" * 100
893         u = upload.Data(DATA, convergence="")
894         d = c0.upload(u)
895         d.addCallback(lambda ur: c0.create_node_from_uri(ur.get_uri()))
896         m = monitor.Monitor()
897         d.addCallback(lambda fn: fn.check(m))
898         def _check(cr):
899             self.failUnlessEqual(cr.get_encoding_needed(), 7)
900             self.failUnlessEqual(cr.get_encoding_expected(), 12)
901         d.addCallback(_check)
902         return d
903
904
905     def _setUp(self, ns):
906         # Used by test_happy_semantics and test_preexisting_share_behavior
907         # to set up the grid.
908         self.node = FakeClient(mode="good", num_servers=ns)
909         self.u = upload.Uploader()
910         self.u.running = True
911         self.u.parent = self.node
912
913
914     def test_happy_semantics(self):
915         self._setUp(2)
916         DATA = upload.Data("kittens" * 10000, convergence="")
917         # These parameters are unsatisfiable with only 2 servers.
918         self.set_encoding_parameters(k=3, happy=5, n=10)
919         d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
920                             "shares could be placed or found on only 2 "
921                             "server(s). We were asked to place shares on "
922                             "at least 5 server(s) such that any 3 of them "
923                             "have enough shares to recover the file",
924                             self.u.upload, DATA)
925         # Let's reset the client to have 10 servers
926         d.addCallback(lambda ign:
927             self._setUp(10))
928         # These parameters are satisfiable with 10 servers.
929         d.addCallback(lambda ign:
930             self.set_encoding_parameters(k=3, happy=5, n=10))
931         d.addCallback(lambda ign:
932             self.u.upload(DATA))
933         # Let's reset the client to have 7 servers
934         # (this is less than n, but more than h)
935         d.addCallback(lambda ign:
936             self._setUp(7))
937         # These parameters are satisfiable with 7 servers.
938         d.addCallback(lambda ign:
939             self.set_encoding_parameters(k=3, happy=5, n=10))
940         d.addCallback(lambda ign:
941             self.u.upload(DATA))
942         return d
943
944     def test_aborted_shares(self):
945         self.basedir = "upload/EncodingParameters/aborted_shares"
946         self.set_up_grid(num_servers=4)
947         c = self.g.clients[0]
948         DATA = upload.Data(100* "kittens", convergence="")
949         # These parameters are unsatisfiable with only 4 servers, but should
950         # work with 5, as long as the original 4 are not stuck in the open
951         # BucketWriter state (open() but not
952         parms = {"k":2, "happy":5, "n":5, "max_segment_size": 1*MiB}
953         c.encoding_params = parms
954         d = self.shouldFail(UploadUnhappinessError, "test_aborted_shares",
955                             "shares could be placed on only 4 "
956                             "server(s) such that any 2 of them have enough "
957                             "shares to recover the file, but we were asked "
958                             "to place shares on at least 5 such servers",
959                             c.upload, DATA)
960         # now add the 5th server
961         d.addCallback(lambda ign: self._add_server(4, False))
962         # and this time the upload ought to succeed
963         d.addCallback(lambda ign: c.upload(DATA))
964         d.addCallback(lambda ign:
965             self.failUnless(self._has_happy_share_distribution()))
966         return d
967
968
969     def test_problem_layout_comment_52(self):
970         def _basedir():
971             self.basedir = self.mktemp()
972         _basedir()
973         # This scenario is at
974         # http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:52
975         #
976         # The scenario in comment:52 proposes that we have a layout
977         # like:
978         # server 0: shares 1 - 9
979         # server 1: share 0, read-only
980         # server 2: share 0, read-only
981         # server 3: share 0, read-only
982         # To get access to the shares, we will first upload to one
983         # server, which will then have shares 0 - 9. We'll then
984         # add three new servers, configure them to not accept any new
985         # shares, then write share 0 directly into the serverdir of each,
986         # and then remove share 0 from server 0 in the same way.
987         # Then each of servers 1 - 3 will report that they have share 0,
988         # and will not accept any new share, while server 0 will report that
989         # it has shares 1 - 9 and will accept new shares.
990         # We'll then set 'happy' = 4, and see that an upload fails
991         # (as it should)
992         d = self._setup_and_upload()
993         d.addCallback(lambda ign:
994             self._add_server_with_share(server_number=1, share_number=0,
995                                         readonly=True))
996         d.addCallback(lambda ign:
997             self._add_server_with_share(server_number=2, share_number=0,
998                                         readonly=True))
999         d.addCallback(lambda ign:
1000             self._add_server_with_share(server_number=3, share_number=0,
1001                                         readonly=True))
1002         # Remove the first share from server 0.
1003         def _remove_share_0_from_server_0():
1004             share_location = self.shares[0][2]
1005             os.remove(share_location)
1006         d.addCallback(lambda ign:
1007             _remove_share_0_from_server_0())
1008         # Set happy = 4 in the client.
1009         def _prepare():
1010             client = self.g.clients[0]
1011             client.encoding_params['happy'] = 4
1012             return client
1013         d.addCallback(lambda ign:
1014             _prepare())
1015         # Uploading data should fail
1016         d.addCallback(lambda client:
1017             self.shouldFail(UploadUnhappinessError,
1018                             "test_problem_layout_comment_52_test_1",
1019                             "shares could be placed or found on 4 server(s), "
1020                             "but they are not spread out evenly enough to "
1021                             "ensure that any 3 of these servers would have "
1022                             "enough shares to recover the file. "
1023                             "We were asked to place shares on at "
1024                             "least 4 servers such that any 3 of them have "
1025                             "enough shares to recover the file",
1026                             client.upload, upload.Data("data" * 10000,
1027                                                        convergence="")))
1028
1029         # Do comment:52, but like this:
1030         # server 2: empty
1031         # server 3: share 0, read-only
1032         # server 1: share 0, read-only
1033         # server 0: shares 0-9
1034         d.addCallback(lambda ign:
1035             _basedir())
1036         d.addCallback(lambda ign:
1037             self._setup_and_upload())
1038         d.addCallback(lambda ign:
1039             self._add_server(server_number=2))
1040         d.addCallback(lambda ign:
1041             self._add_server_with_share(server_number=3, share_number=0,
1042                                         readonly=True))
1043         d.addCallback(lambda ign:
1044             self._add_server_with_share(server_number=1, share_number=0,
1045                                         readonly=True))
1046         def _prepare2():
1047             client = self.g.clients[0]
1048             client.encoding_params['happy'] = 4
1049             return client
1050         d.addCallback(lambda ign:
1051             _prepare2())
1052         d.addCallback(lambda client:
1053             self.shouldFail(UploadUnhappinessError,
1054                             "test_problem_layout_comment_52_test_2",
1055                             "shares could be placed on only 3 server(s) such "
1056                             "that any 3 of them have enough shares to recover "
1057                             "the file, but we were asked to place shares on "
1058                             "at least 4 such servers.",
1059                             client.upload, upload.Data("data" * 10000,
1060                                                        convergence="")))
1061         return d
1062
1063
1064     def test_problem_layout_comment_53(self):
1065         # This scenario is at
1066         # http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:53
1067         #
1068         # Set up the grid to have one server
1069         def _change_basedir(ign):
1070             self.basedir = self.mktemp()
1071         _change_basedir(None)
1072         # We start by uploading all of the shares to one server.
1073         # Next, we'll add three new servers to our NoNetworkGrid. We'll add
1074         # one share from our initial upload to each of these.
1075         # The counterintuitive ordering of the share numbers is to deal with
1076         # the permuting of these servers -- distributing the shares this
1077         # way ensures that the Tahoe2ServerSelector sees them in the order
1078         # described below.
1079         d = self._setup_and_upload()
1080         d.addCallback(lambda ign:
1081             self._add_server_with_share(server_number=1, share_number=2))
1082         d.addCallback(lambda ign:
1083             self._add_server_with_share(server_number=2, share_number=0))
1084         d.addCallback(lambda ign:
1085             self._add_server_with_share(server_number=3, share_number=1))
1086         # So, we now have the following layout:
1087         # server 0: shares 0 - 9
1088         # server 1: share 2
1089         # server 2: share 0
1090         # server 3: share 1
1091         # We change the 'happy' parameter in the client to 4.
1092         # The Tahoe2ServerSelector will see the servers permuted as:
1093         # 2, 3, 1, 0
1094         # Ideally, a reupload of our original data should work.
1095         def _reset_encoding_parameters(ign, happy=4):
1096             client = self.g.clients[0]
1097             client.encoding_params['happy'] = happy
1098             return client
1099         d.addCallback(_reset_encoding_parameters)
1100         d.addCallback(lambda client:
1101             client.upload(upload.Data("data" * 10000, convergence="")))
1102         d.addCallback(lambda ign:
1103             self.failUnless(self._has_happy_share_distribution()))
1104
1105
1106         # This scenario is basically comment:53, but changed so that the
1107         # Tahoe2ServerSelector sees the server with all of the shares before
1108         # any of the other servers.
1109         # The layout is:
1110         # server 2: shares 0 - 9
1111         # server 3: share 0
1112         # server 1: share 1
1113         # server 4: share 2
1114         # The Tahoe2ServerSelector sees the servers permuted as:
1115         # 2, 3, 1, 4
1116         # Note that server 0 has been replaced by server 4; this makes it
1117         # easier to ensure that the last server seen by Tahoe2ServerSelector
1118         # has only one share.
1119         d.addCallback(_change_basedir)
1120         d.addCallback(lambda ign:
1121             self._setup_and_upload())
1122         d.addCallback(lambda ign:
1123             self._add_server_with_share(server_number=2, share_number=0))
1124         d.addCallback(lambda ign:
1125             self._add_server_with_share(server_number=3, share_number=1))
1126         d.addCallback(lambda ign:
1127             self._add_server_with_share(server_number=1, share_number=2))
1128         # Copy all of the other shares to server number 2
1129         def _copy_shares(ign):
1130             for i in xrange(0, 10):
1131                 self._copy_share_to_server(i, 2)
1132         d.addCallback(_copy_shares)
1133         # Remove the first server, and add a placeholder with share 0
1134         d.addCallback(lambda ign:
1135             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1136         d.addCallback(lambda ign:
1137             self._add_server_with_share(server_number=4, share_number=0))
1138         # Now try uploading.
1139         d.addCallback(_reset_encoding_parameters)
1140         d.addCallback(lambda client:
1141             client.upload(upload.Data("data" * 10000, convergence="")))
1142         d.addCallback(lambda ign:
1143             self.failUnless(self._has_happy_share_distribution()))
1144
1145
1146         # Try the same thing, but with empty servers after the first one
1147         # We want to make sure that Tahoe2ServerSelector will redistribute
1148         # shares as necessary, not simply discover an existing layout.
1149         # The layout is:
1150         # server 2: shares 0 - 9
1151         # server 3: empty
1152         # server 1: empty
1153         # server 4: empty
1154         d.addCallback(_change_basedir)
1155         d.addCallback(lambda ign:
1156             self._setup_and_upload())
1157         d.addCallback(lambda ign:
1158             self._add_server(server_number=2))
1159         d.addCallback(lambda ign:
1160             self._add_server(server_number=3))
1161         d.addCallback(lambda ign:
1162             self._add_server(server_number=1))
1163         d.addCallback(lambda ign:
1164             self._add_server(server_number=4))
1165         d.addCallback(_copy_shares)
1166         d.addCallback(lambda ign:
1167             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1168         d.addCallback(_reset_encoding_parameters)
1169         d.addCallback(lambda client:
1170             client.upload(upload.Data("data" * 10000, convergence="")))
1171         # Make sure that only as many shares as necessary to satisfy
1172         # servers of happiness were pushed.
1173         d.addCallback(lambda results:
1174             self.failUnlessEqual(results.get_pushed_shares(), 3))
1175         d.addCallback(lambda ign:
1176             self.failUnless(self._has_happy_share_distribution()))
1177         return d
1178
1179     def test_problem_layout_ticket_1124(self):
1180         self.basedir = self.mktemp()
1181         d = self._setup_and_upload(k=2, n=4)
1182
1183         # server 0: shares 0, 1, 2, 3
1184         # server 1: shares 0, 3
1185         # server 2: share 1
1186         # server 3: share 2
1187         # With this layout, an upload should just be satisfied that the current distribution is good enough, right?
1188         def _setup(ign):
1189             self._add_server_with_share(server_number=0, share_number=None)
1190             self._add_server_with_share(server_number=1, share_number=0)
1191             self._add_server_with_share(server_number=2, share_number=1)
1192             self._add_server_with_share(server_number=3, share_number=2)
1193             # Copy shares
1194             self._copy_share_to_server(3, 1)
1195             client = self.g.clients[0]
1196             client.encoding_params['happy'] = 4
1197             return client
1198
1199         d.addCallback(_setup)
1200         d.addCallback(lambda client:
1201             client.upload(upload.Data("data" * 10000, convergence="")))
1202         d.addCallback(lambda ign:
1203             self.failUnless(self._has_happy_share_distribution()))
1204         return d
1205     test_problem_layout_ticket_1124.todo = "Fix this after 1.7.1 release."
1206
1207     def test_happiness_with_some_readonly_servers(self):
1208         # Try the following layout
1209         # server 2: shares 0-9
1210         # server 4: share 0, read-only
1211         # server 3: share 1, read-only
1212         # server 1: share 2, read-only
1213         self.basedir = self.mktemp()
1214         d = self._setup_and_upload()
1215         d.addCallback(lambda ign:
1216             self._add_server_with_share(server_number=2, share_number=0))
1217         d.addCallback(lambda ign:
1218             self._add_server_with_share(server_number=3, share_number=1,
1219                                         readonly=True))
1220         d.addCallback(lambda ign:
1221             self._add_server_with_share(server_number=1, share_number=2,
1222                                         readonly=True))
1223         # Copy all of the other shares to server number 2
1224         def _copy_shares(ign):
1225             for i in xrange(1, 10):
1226                 self._copy_share_to_server(i, 2)
1227         d.addCallback(_copy_shares)
1228         # Remove server 0, and add another in its place
1229         d.addCallback(lambda ign:
1230             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1231         d.addCallback(lambda ign:
1232             self._add_server_with_share(server_number=4, share_number=0,
1233                                         readonly=True))
1234         def _reset_encoding_parameters(ign, happy=4):
1235             client = self.g.clients[0]
1236             client.encoding_params['happy'] = happy
1237             return client
1238         d.addCallback(_reset_encoding_parameters)
1239         d.addCallback(lambda client:
1240             client.upload(upload.Data("data" * 10000, convergence="")))
1241         d.addCallback(lambda ign:
1242             self.failUnless(self._has_happy_share_distribution()))
1243         return d
1244
1245
1246     def test_happiness_with_all_readonly_servers(self):
1247         # server 3: share 1, read-only
1248         # server 1: share 2, read-only
1249         # server 2: shares 0-9, read-only
1250         # server 4: share 0, read-only
1251         # The idea with this test is to make sure that the survey of
1252         # read-only servers doesn't undercount servers of happiness
1253         self.basedir = self.mktemp()
1254         d = self._setup_and_upload()
1255         d.addCallback(lambda ign:
1256             self._add_server_with_share(server_number=4, share_number=0,
1257                                         readonly=True))
1258         d.addCallback(lambda ign:
1259             self._add_server_with_share(server_number=3, share_number=1,
1260                                         readonly=True))
1261         d.addCallback(lambda ign:
1262             self._add_server_with_share(server_number=1, share_number=2,
1263                                         readonly=True))
1264         d.addCallback(lambda ign:
1265             self._add_server_with_share(server_number=2, share_number=0,
1266                                         readonly=True))
1267         def _copy_shares(ign):
1268             for i in xrange(1, 10):
1269                 self._copy_share_to_server(i, 2)
1270         d.addCallback(_copy_shares)
1271         d.addCallback(lambda ign:
1272             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1273         def _reset_encoding_parameters(ign, happy=4):
1274             client = self.g.clients[0]
1275             client.encoding_params['happy'] = happy
1276             return client
1277         d.addCallback(_reset_encoding_parameters)
1278         d.addCallback(lambda client:
1279             client.upload(upload.Data("data" * 10000, convergence="")))
1280         d.addCallback(lambda ign:
1281             self.failUnless(self._has_happy_share_distribution()))
1282         return d
1283
1284
1285     def test_dropped_servers_in_encoder(self):
1286         # The Encoder does its own "servers_of_happiness" check if it
1287         # happens to lose a bucket during an upload (it assumes that
1288         # the layout presented to it satisfies "servers_of_happiness"
1289         # until a failure occurs)
1290         #
1291         # This test simulates an upload where servers break after server
1292         # selection, but before they are written to.
1293         def _set_basedir(ign=None):
1294             self.basedir = self.mktemp()
1295         _set_basedir()
1296         d = self._setup_and_upload();
1297         # Add 5 servers
1298         def _do_server_setup(ign):
1299             self._add_server(server_number=1)
1300             self._add_server(server_number=2)
1301             self._add_server(server_number=3)
1302             self._add_server(server_number=4)
1303             self._add_server(server_number=5)
1304         d.addCallback(_do_server_setup)
1305         # remove the original server
1306         # (necessary to ensure that the Tahoe2ServerSelector will distribute
1307         #  all the shares)
1308         def _remove_server(ign):
1309             server = self.g.servers_by_number[0]
1310             self.g.remove_server(server.my_nodeid)
1311         d.addCallback(_remove_server)
1312         # This should succeed; we still have 4 servers, and the
1313         # happiness of the upload is 4.
1314         d.addCallback(lambda ign:
1315             self._do_upload_with_broken_servers(1))
1316         # Now, do the same thing over again, but drop 2 servers instead
1317         # of 1. This should fail, because servers_of_happiness is 4 and
1318         # we can't satisfy that.
1319         d.addCallback(_set_basedir)
1320         d.addCallback(lambda ign:
1321             self._setup_and_upload())
1322         d.addCallback(_do_server_setup)
1323         d.addCallback(_remove_server)
1324         d.addCallback(lambda ign:
1325             self.shouldFail(UploadUnhappinessError,
1326                             "test_dropped_servers_in_encoder",
1327                             "shares could be placed on only 3 server(s) "
1328                             "such that any 3 of them have enough shares to "
1329                             "recover the file, but we were asked to place "
1330                             "shares on at least 4",
1331                             self._do_upload_with_broken_servers, 2))
1332         # Now do the same thing over again, but make some of the servers
1333         # readonly, break some of the ones that aren't, and make sure that
1334         # happiness accounting is preserved.
1335         d.addCallback(_set_basedir)
1336         d.addCallback(lambda ign:
1337             self._setup_and_upload())
1338         def _do_server_setup_2(ign):
1339             self._add_server(1)
1340             self._add_server(2)
1341             self._add_server(3)
1342             self._add_server_with_share(4, 7, readonly=True)
1343             self._add_server_with_share(5, 8, readonly=True)
1344         d.addCallback(_do_server_setup_2)
1345         d.addCallback(_remove_server)
1346         d.addCallback(lambda ign:
1347             self._do_upload_with_broken_servers(1))
1348         d.addCallback(_set_basedir)
1349         d.addCallback(lambda ign:
1350             self._setup_and_upload())
1351         d.addCallback(_do_server_setup_2)
1352         d.addCallback(_remove_server)
1353         d.addCallback(lambda ign:
1354             self.shouldFail(UploadUnhappinessError,
1355                             "test_dropped_servers_in_encoder",
1356                             "shares could be placed on only 3 server(s) "
1357                             "such that any 3 of them have enough shares to "
1358                             "recover the file, but we were asked to place "
1359                             "shares on at least 4",
1360                             self._do_upload_with_broken_servers, 2))
1361         return d
1362
1363
1364     def test_merge_servers(self):
1365         # merge_servers merges a list of upload_servers and a dict of
1366         # shareid -> serverid mappings.
1367         shares = {
1368                     1 : set(["server1"]),
1369                     2 : set(["server2"]),
1370                     3 : set(["server3"]),
1371                     4 : set(["server4", "server5"]),
1372                     5 : set(["server1", "server2"]),
1373                  }
1374         # if not provided with a upload_servers argument, it should just
1375         # return the first argument unchanged.
1376         self.failUnlessEqual(shares, merge_servers(shares, set([])))
1377         trackers = []
1378         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1379             t = FakeServerTracker(server, [i])
1380             trackers.append(t)
1381         expected = {
1382                     1 : set(["server1"]),
1383                     2 : set(["server2"]),
1384                     3 : set(["server3"]),
1385                     4 : set(["server4", "server5"]),
1386                     5 : set(["server1", "server2", "server5"]),
1387                     6 : set(["server6"]),
1388                     7 : set(["server7"]),
1389                     8 : set(["server8"]),
1390                    }
1391         self.failUnlessEqual(expected, merge_servers(shares, set(trackers)))
1392         shares2 = {}
1393         expected = {
1394                     5 : set(["server5"]),
1395                     6 : set(["server6"]),
1396                     7 : set(["server7"]),
1397                     8 : set(["server8"]),
1398                    }
1399         self.failUnlessEqual(expected, merge_servers(shares2, set(trackers)))
1400         shares3 = {}
1401         trackers = []
1402         expected = {}
1403         for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1404             shares3[i] = set([server])
1405             t = FakeServerTracker(server, [i])
1406             trackers.append(t)
1407             expected[i] = set([server])
1408         self.failUnlessEqual(expected, merge_servers(shares3, set(trackers)))
1409
1410
1411     def test_servers_of_happiness_utility_function(self):
1412         # These tests are concerned with the servers_of_happiness()
1413         # utility function, and its underlying matching algorithm. Other
1414         # aspects of the servers_of_happiness behavior are tested
1415         # elsehwere These tests exist to ensure that
1416         # servers_of_happiness doesn't under or overcount the happiness
1417         # value for given inputs.
1418
1419         # servers_of_happiness expects a dict of
1420         # shnum => set(serverids) as a preexisting shares argument.
1421         test1 = {
1422                  1 : set(["server1"]),
1423                  2 : set(["server2"]),
1424                  3 : set(["server3"]),
1425                  4 : set(["server4"])
1426                 }
1427         happy = servers_of_happiness(test1)
1428         self.failUnlessEqual(4, happy)
1429         test1[4] = set(["server1"])
1430         # We've added a duplicate server, so now servers_of_happiness
1431         # should be 3 instead of 4.
1432         happy = servers_of_happiness(test1)
1433         self.failUnlessEqual(3, happy)
1434         # The second argument of merge_servers should be a set of objects with
1435         # serverid and buckets as attributes. In actual use, these will be
1436         # ServerTracker instances, but for testing it is fine to make a
1437         # FakeServerTracker whose job is to hold those instance variables to
1438         # test that part.
1439         trackers = []
1440         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1441             t = FakeServerTracker(server, [i])
1442             trackers.append(t)
1443         # Recall that test1 is a server layout with servers_of_happiness
1444         # = 3.  Since there isn't any overlap between the shnum ->
1445         # set([serverid]) correspondences in test1 and those in trackers,
1446         # the result here should be 7.
1447         test2 = merge_servers(test1, set(trackers))
1448         happy = servers_of_happiness(test2)
1449         self.failUnlessEqual(7, happy)
1450         # Now add an overlapping server to trackers. This is redundant,
1451         # so it should not cause the previously reported happiness value
1452         # to change.
1453         t = FakeServerTracker("server1", [1])
1454         trackers.append(t)
1455         test2 = merge_servers(test1, set(trackers))
1456         happy = servers_of_happiness(test2)
1457         self.failUnlessEqual(7, happy)
1458         test = {}
1459         happy = servers_of_happiness(test)
1460         self.failUnlessEqual(0, happy)
1461         # Test a more substantial overlap between the trackers and the
1462         # existing assignments.
1463         test = {
1464             1 : set(['server1']),
1465             2 : set(['server2']),
1466             3 : set(['server3']),
1467             4 : set(['server4']),
1468         }
1469         trackers = []
1470         t = FakeServerTracker('server5', [4])
1471         trackers.append(t)
1472         t = FakeServerTracker('server6', [3, 5])
1473         trackers.append(t)
1474         # The value returned by servers_of_happiness is the size
1475         # of a maximum matching in the bipartite graph that
1476         # servers_of_happiness() makes between serverids and share
1477         # numbers. It should find something like this:
1478         # (server 1, share 1)
1479         # (server 2, share 2)
1480         # (server 3, share 3)
1481         # (server 5, share 4)
1482         # (server 6, share 5)
1483         #
1484         # and, since there are 5 edges in this matching, it should
1485         # return 5.
1486         test2 = merge_servers(test, set(trackers))
1487         happy = servers_of_happiness(test2)
1488         self.failUnlessEqual(5, happy)
1489         # Zooko's first puzzle:
1490         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1491         #
1492         # server 1: shares 0, 1
1493         # server 2: shares 1, 2
1494         # server 3: share 2
1495         #
1496         # This should yield happiness of 3.
1497         test = {
1498             0 : set(['server1']),
1499             1 : set(['server1', 'server2']),
1500             2 : set(['server2', 'server3']),
1501         }
1502         self.failUnlessEqual(3, servers_of_happiness(test))
1503         # Zooko's second puzzle:
1504         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1505         #
1506         # server 1: shares 0, 1
1507         # server 2: share 1
1508         #
1509         # This should yield happiness of 2.
1510         test = {
1511             0 : set(['server1']),
1512             1 : set(['server1', 'server2']),
1513         }
1514         self.failUnlessEqual(2, servers_of_happiness(test))
1515
1516
1517     def test_shares_by_server(self):
1518         test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1519         sbs = shares_by_server(test)
1520         self.failUnlessEqual(set([1]), sbs["server1"])
1521         self.failUnlessEqual(set([2]), sbs["server2"])
1522         self.failUnlessEqual(set([3]), sbs["server3"])
1523         self.failUnlessEqual(set([4]), sbs["server4"])
1524         test1 = {
1525                     1 : set(["server1"]),
1526                     2 : set(["server1"]),
1527                     3 : set(["server1"]),
1528                     4 : set(["server2"]),
1529                     5 : set(["server2"])
1530                 }
1531         sbs = shares_by_server(test1)
1532         self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1533         self.failUnlessEqual(set([4, 5]), sbs["server2"])
1534         # This should fail unless the serverid part of the mapping is a set
1535         test2 = {1: "server1"}
1536         self.shouldFail(AssertionError,
1537                        "test_shares_by_server",
1538                        "",
1539                        shares_by_server, test2)
1540
1541
1542     def test_existing_share_detection(self):
1543         self.basedir = self.mktemp()
1544         d = self._setup_and_upload()
1545         # Our final setup should look like this:
1546         # server 1: shares 0 - 9, read-only
1547         # server 2: empty
1548         # server 3: empty
1549         # server 4: empty
1550         # The purpose of this test is to make sure that the server selector
1551         # knows about the shares on server 1, even though it is read-only.
1552         # It used to simply filter these out, which would cause the test
1553         # to fail when servers_of_happiness = 4.
1554         d.addCallback(lambda ign:
1555             self._add_server_with_share(1, 0, True))
1556         d.addCallback(lambda ign:
1557             self._add_server(2))
1558         d.addCallback(lambda ign:
1559             self._add_server(3))
1560         d.addCallback(lambda ign:
1561             self._add_server(4))
1562         def _copy_shares(ign):
1563             for i in xrange(1, 10):
1564                 self._copy_share_to_server(i, 1)
1565         d.addCallback(_copy_shares)
1566         d.addCallback(lambda ign:
1567             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1568         def _prepare_client(ign):
1569             client = self.g.clients[0]
1570             client.encoding_params['happy'] = 4
1571             return client
1572         d.addCallback(_prepare_client)
1573         d.addCallback(lambda client:
1574             client.upload(upload.Data("data" * 10000, convergence="")))
1575         d.addCallback(lambda ign:
1576             self.failUnless(self._has_happy_share_distribution()))
1577         return d
1578
1579
1580     def test_query_counting(self):
1581         # If server selection fails, Tahoe2ServerSelector prints out a lot
1582         # of helpful diagnostic information, including query stats.
1583         # This test helps make sure that that information is accurate.
1584         self.basedir = self.mktemp()
1585         d = self._setup_and_upload()
1586         def _setup(ign):
1587             for i in xrange(1, 11):
1588                 self._add_server(server_number=i)
1589             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1590             c = self.g.clients[0]
1591             # We set happy to an unsatisfiable value so that we can check the
1592             # counting in the exception message. The same progress message
1593             # is also used when the upload is successful, but in that case it
1594             # only gets written to a log, so we can't see what it says.
1595             c.encoding_params['happy'] = 45
1596             return c
1597         d.addCallback(_setup)
1598         d.addCallback(lambda c:
1599             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1600                             "10 queries placed some shares",
1601                             c.upload, upload.Data("data" * 10000,
1602                                                   convergence="")))
1603         # Now try with some readonly servers. We want to make sure that
1604         # the readonly server share discovery phase is counted correctly.
1605         def _reset(ign):
1606             self.basedir = self.mktemp()
1607             self.g = None
1608         d.addCallback(_reset)
1609         d.addCallback(lambda ign:
1610             self._setup_and_upload())
1611         def _then(ign):
1612             for i in xrange(1, 11):
1613                 self._add_server(server_number=i)
1614             self._add_server(server_number=11, readonly=True)
1615             self._add_server(server_number=12, readonly=True)
1616             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1617             c = self.g.clients[0]
1618             c.encoding_params['happy'] = 45
1619             return c
1620         d.addCallback(_then)
1621         d.addCallback(lambda c:
1622             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1623                             "2 placed none (of which 2 placed none due to "
1624                             "the server being full",
1625                             c.upload, upload.Data("data" * 10000,
1626                                                   convergence="")))
1627         # Now try the case where the upload process finds a bunch of the
1628         # shares that it wants to place on the first server, including
1629         # the one that it wanted to allocate there. Though no shares will
1630         # be allocated in this request, it should still be called
1631         # productive, since it caused some homeless shares to be
1632         # removed.
1633         d.addCallback(_reset)
1634         d.addCallback(lambda ign:
1635             self._setup_and_upload())
1636
1637         def _next(ign):
1638             for i in xrange(1, 11):
1639                 self._add_server(server_number=i)
1640             # Copy all of the shares to server 9, since that will be
1641             # the first one that the selector sees.
1642             for i in xrange(10):
1643                 self._copy_share_to_server(i, 9)
1644             # Remove server 0, and its contents
1645             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1646             # Make happiness unsatisfiable
1647             c = self.g.clients[0]
1648             c.encoding_params['happy'] = 45
1649             return c
1650         d.addCallback(_next)
1651         d.addCallback(lambda c:
1652             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1653                             "1 queries placed some shares",
1654                             c.upload, upload.Data("data" * 10000,
1655                                                   convergence="")))
1656         return d
1657
1658
1659     def test_upper_limit_on_readonly_queries(self):
1660         self.basedir = self.mktemp()
1661         d = self._setup_and_upload()
1662         def _then(ign):
1663             for i in xrange(1, 11):
1664                 self._add_server(server_number=i, readonly=True)
1665             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1666             c = self.g.clients[0]
1667             c.encoding_params['k'] = 2
1668             c.encoding_params['happy'] = 4
1669             c.encoding_params['n'] = 4
1670             return c
1671         d.addCallback(_then)
1672         d.addCallback(lambda client:
1673             self.shouldFail(UploadUnhappinessError,
1674                             "test_upper_limit_on_readonly_queries",
1675                             "sent 8 queries to 8 servers",
1676                             client.upload,
1677                             upload.Data('data' * 10000, convergence="")))
1678         return d
1679
1680
1681     def test_exception_messages_during_server_selection(self):
1682         # server 1: read-only, no shares
1683         # server 2: read-only, no shares
1684         # server 3: read-only, no shares
1685         # server 4: read-only, no shares
1686         # server 5: read-only, no shares
1687         # This will fail, but we want to make sure that the log messages
1688         # are informative about why it has failed.
1689         self.basedir = self.mktemp()
1690         d = self._setup_and_upload()
1691         d.addCallback(lambda ign:
1692             self._add_server(server_number=1, readonly=True))
1693         d.addCallback(lambda ign:
1694             self._add_server(server_number=2, readonly=True))
1695         d.addCallback(lambda ign:
1696             self._add_server(server_number=3, readonly=True))
1697         d.addCallback(lambda ign:
1698             self._add_server(server_number=4, readonly=True))
1699         d.addCallback(lambda ign:
1700             self._add_server(server_number=5, readonly=True))
1701         d.addCallback(lambda ign:
1702             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1703         def _reset_encoding_parameters(ign, happy=4):
1704             client = self.g.clients[0]
1705             client.encoding_params['happy'] = happy
1706             return client
1707         d.addCallback(_reset_encoding_parameters)
1708         d.addCallback(lambda client:
1709             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1710                             "placed 0 shares out of 10 "
1711                             "total (10 homeless), want to place shares on at "
1712                             "least 4 servers such that any 3 of them have "
1713                             "enough shares to recover the file, "
1714                             "sent 5 queries to 5 servers, 0 queries placed "
1715                             "some shares, 5 placed none "
1716                             "(of which 5 placed none due to the server being "
1717                             "full and 0 placed none due to an error)",
1718                             client.upload,
1719                             upload.Data("data" * 10000, convergence="")))
1720
1721
1722         # server 1: read-only, no shares
1723         # server 2: broken, no shares
1724         # server 3: read-only, no shares
1725         # server 4: read-only, no shares
1726         # server 5: read-only, no shares
1727         def _reset(ign):
1728             self.basedir = self.mktemp()
1729         d.addCallback(_reset)
1730         d.addCallback(lambda ign:
1731             self._setup_and_upload())
1732         d.addCallback(lambda ign:
1733             self._add_server(server_number=1, readonly=True))
1734         d.addCallback(lambda ign:
1735             self._add_server(server_number=2))
1736         def _break_server_2(ign):
1737             serverid = self.g.servers_by_number[2].my_nodeid
1738             self.g.break_server(serverid)
1739         d.addCallback(_break_server_2)
1740         d.addCallback(lambda ign:
1741             self._add_server(server_number=3, readonly=True))
1742         d.addCallback(lambda ign:
1743             self._add_server(server_number=4, readonly=True))
1744         d.addCallback(lambda ign:
1745             self._add_server(server_number=5, readonly=True))
1746         d.addCallback(lambda ign:
1747             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1748         d.addCallback(_reset_encoding_parameters)
1749         d.addCallback(lambda client:
1750             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1751                             "placed 0 shares out of 10 "
1752                             "total (10 homeless), want to place shares on at "
1753                             "least 4 servers such that any 3 of them have "
1754                             "enough shares to recover the file, "
1755                             "sent 5 queries to 5 servers, 0 queries placed "
1756                             "some shares, 5 placed none "
1757                             "(of which 4 placed none due to the server being "
1758                             "full and 1 placed none due to an error)",
1759                             client.upload,
1760                             upload.Data("data" * 10000, convergence="")))
1761         # server 0, server 1 = empty, accepting shares
1762         # This should place all of the shares, but still fail with happy=4.
1763         # We want to make sure that the exception message is worded correctly.
1764         d.addCallback(_reset)
1765         d.addCallback(lambda ign:
1766             self._setup_grid())
1767         d.addCallback(lambda ign:
1768             self._add_server(server_number=1))
1769         d.addCallback(_reset_encoding_parameters)
1770         d.addCallback(lambda client:
1771             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1772                             "shares could be placed or found on only 2 "
1773                             "server(s). We were asked to place shares on at "
1774                             "least 4 server(s) such that any 3 of them have "
1775                             "enough shares to recover the file.",
1776                             client.upload, upload.Data("data" * 10000,
1777                                                        convergence="")))
1778         # servers 0 - 4 = empty, accepting shares
1779         # This too should place all the shares, and this too should fail,
1780         # but since the effective happiness is more than the k encoding
1781         # parameter, it should trigger a different error message than the one
1782         # above.
1783         d.addCallback(_reset)
1784         d.addCallback(lambda ign:
1785             self._setup_grid())
1786         d.addCallback(lambda ign:
1787             self._add_server(server_number=1))
1788         d.addCallback(lambda ign:
1789             self._add_server(server_number=2))
1790         d.addCallback(lambda ign:
1791             self._add_server(server_number=3))
1792         d.addCallback(lambda ign:
1793             self._add_server(server_number=4))
1794         d.addCallback(_reset_encoding_parameters, happy=7)
1795         d.addCallback(lambda client:
1796             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1797                             "shares could be placed on only 5 server(s) such "
1798                             "that any 3 of them have enough shares to recover "
1799                             "the file, but we were asked to place shares on "
1800                             "at least 7 such servers.",
1801                             client.upload, upload.Data("data" * 10000,
1802                                                        convergence="")))
1803         # server 0: shares 0 - 9
1804         # server 1: share 0, read-only
1805         # server 2: share 0, read-only
1806         # server 3: share 0, read-only
1807         # This should place all of the shares, but fail with happy=4.
1808         # Since the number of servers with shares is more than the number
1809         # necessary to reconstitute the file, this will trigger a different
1810         # error message than either of those above.
1811         d.addCallback(_reset)
1812         d.addCallback(lambda ign:
1813             self._setup_and_upload())
1814         d.addCallback(lambda ign:
1815             self._add_server_with_share(server_number=1, share_number=0,
1816                                         readonly=True))
1817         d.addCallback(lambda ign:
1818             self._add_server_with_share(server_number=2, share_number=0,
1819                                         readonly=True))
1820         d.addCallback(lambda ign:
1821             self._add_server_with_share(server_number=3, share_number=0,
1822                                         readonly=True))
1823         d.addCallback(_reset_encoding_parameters, happy=7)
1824         d.addCallback(lambda client:
1825             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1826                             "shares could be placed or found on 4 server(s), "
1827                             "but they are not spread out evenly enough to "
1828                             "ensure that any 3 of these servers would have "
1829                             "enough shares to recover the file. We were asked "
1830                             "to place shares on at least 7 servers such that "
1831                             "any 3 of them have enough shares to recover the "
1832                             "file",
1833                             client.upload, upload.Data("data" * 10000,
1834                                                        convergence="")))
1835         return d
1836
1837
1838     def test_problem_layout_comment_187(self):
1839         # #778 comment 187 broke an initial attempt at a share
1840         # redistribution algorithm. This test is here to demonstrate the
1841         # breakage, and to test that subsequent algorithms don't also
1842         # break in the same way.
1843         self.basedir = self.mktemp()
1844         d = self._setup_and_upload(k=2, n=3)
1845
1846         # server 1: shares 0, 1, 2, readonly
1847         # server 2: share 0, readonly
1848         # server 3: share 0
1849         def _setup(ign):
1850             self._add_server_with_share(server_number=1, share_number=0,
1851                                         readonly=True)
1852             self._add_server_with_share(server_number=2, share_number=0,
1853                                         readonly=True)
1854             self._add_server_with_share(server_number=3, share_number=0)
1855             # Copy shares
1856             self._copy_share_to_server(1, 1)
1857             self._copy_share_to_server(2, 1)
1858             # Remove server 0
1859             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1860             client = self.g.clients[0]
1861             client.encoding_params['happy'] = 3
1862             return client
1863
1864         d.addCallback(_setup)
1865         d.addCallback(lambda client:
1866             client.upload(upload.Data("data" * 10000, convergence="")))
1867         d.addCallback(lambda ign:
1868             self.failUnless(self._has_happy_share_distribution()))
1869         return d
1870     test_problem_layout_comment_187.todo = "this isn't fixed yet"
1871
1872     def test_problem_layout_ticket_1118(self):
1873         # #1118 includes a report from a user who hit an assertion in
1874         # the upload code with this layout.
1875         self.basedir = self.mktemp()
1876         d = self._setup_and_upload(k=2, n=4)
1877
1878         # server 0: no shares
1879         # server 1: shares 0, 3
1880         # server 3: share 1
1881         # server 2: share 2
1882         # The order that they get queries is 0, 1, 3, 2
1883         def _setup(ign):
1884             self._add_server(server_number=0)
1885             self._add_server_with_share(server_number=1, share_number=0)
1886             self._add_server_with_share(server_number=2, share_number=2)
1887             self._add_server_with_share(server_number=3, share_number=1)
1888             # Copy shares
1889             self._copy_share_to_server(3, 1)
1890             self.delete_all_shares(self.get_serverdir(0))
1891             client = self.g.clients[0]
1892             client.encoding_params['happy'] = 4
1893             return client
1894
1895         d.addCallback(_setup)
1896         # Note: actually it should succeed! See
1897         # test_problem_layout_ticket_1128. But ticket 1118 is just to
1898         # make it realize that it has failed, so if it raises
1899         # UploadUnhappinessError then we'll give it the green light
1900         # for now.
1901         d.addCallback(lambda ignored:
1902             self.shouldFail(UploadUnhappinessError,
1903                             "test_problem_layout_ticket_1118",
1904                             "",
1905                             self.g.clients[0].upload, upload.Data("data" * 10000,
1906                                                        convergence="")))
1907         return d
1908
1909     def test_problem_layout_ticket_1128(self):
1910         # #1118 includes a report from a user who hit an assertion in
1911         # the upload code with this layout.
1912         self.basedir = self.mktemp()
1913         d = self._setup_and_upload(k=2, n=4)
1914
1915         # server 0: no shares
1916         # server 1: shares 0, 3
1917         # server 3: share 1
1918         # server 2: share 2
1919         # The order that they get queries is 0, 1, 3, 2
1920         def _setup(ign):
1921             self._add_server(server_number=0)
1922             self._add_server_with_share(server_number=1, share_number=0)
1923             self._add_server_with_share(server_number=2, share_number=2)
1924             self._add_server_with_share(server_number=3, share_number=1)
1925             # Copy shares
1926             self._copy_share_to_server(3, 1)
1927             #Remove shares from server 0
1928             self.delete_all_shares(self.get_serverdir(0))
1929             client = self.g.clients[0]
1930             client.encoding_params['happy'] = 4
1931             return client
1932
1933         d.addCallback(_setup)
1934         d.addCallback(lambda client:
1935                           client.upload(upload.Data("data" * 10000, convergence="")))
1936         d.addCallback(lambda ign:
1937             self.failUnless(self._has_happy_share_distribution()))
1938         return d
1939     test_problem_layout_ticket_1128.todo = "Invent a smarter uploader that uploads successfully in this case."
1940
1941     def test_upload_succeeds_with_some_homeless_shares(self):
1942         # If the upload is forced to stop trying to place shares before
1943         # it has placed (or otherwise accounted) for all of them, but it
1944         # has placed enough to satisfy the upload health criteria that
1945         # we're using, it should still succeed.
1946         self.basedir = self.mktemp()
1947         d = self._setup_and_upload()
1948         def _server_setup(ign):
1949             # Add four servers so that we have a layout like this:
1950             # server 1: share 0, read-only
1951             # server 2: share 1, read-only
1952             # server 3: share 2, read-only
1953             # server 4: share 3, read-only
1954             # If we set happy = 4, the upload will manage to satisfy
1955             # servers of happiness, but not place all of the shares; we
1956             # want to test that the upload is declared successful in
1957             # this case.
1958             self._add_server_with_share(server_number=1, share_number=0,
1959                                         readonly=True)
1960             self._add_server_with_share(server_number=2, share_number=1,
1961                                         readonly=True)
1962             self._add_server_with_share(server_number=3, share_number=2,
1963                                         readonly=True)
1964             self._add_server_with_share(server_number=4, share_number=3,
1965                                         readonly=True)
1966             # Remove server 0.
1967             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1968             # Set the client appropriately
1969             c = self.g.clients[0]
1970             c.encoding_params['happy'] = 4
1971             return c
1972         d.addCallback(_server_setup)
1973         d.addCallback(lambda client:
1974             client.upload(upload.Data("data" * 10000, convergence="")))
1975         d.addCallback(lambda ign:
1976             self.failUnless(self._has_happy_share_distribution()))
1977         return d
1978
1979
1980     def test_uploader_skips_over_servers_with_only_one_share(self):
1981         # We want to make sure that the redistribution logic ignores
1982         # servers with only one share, since placing these shares
1983         # elsewhere will at best keep happiness the same as it was, and
1984         # at worst hurt it.
1985         self.basedir = self.mktemp()
1986         d = self._setup_and_upload()
1987         def _server_setup(ign):
1988             # Add some servers so that the upload will need to
1989             # redistribute, but will first pass over a couple of servers
1990             # that don't have enough shares to redistribute before
1991             # finding one that does have shares to redistribute.
1992             self._add_server_with_share(server_number=1, share_number=0)
1993             self._add_server_with_share(server_number=2, share_number=2)
1994             self._add_server_with_share(server_number=3, share_number=1)
1995             self._add_server_with_share(server_number=8, share_number=4)
1996             self._add_server_with_share(server_number=5, share_number=5)
1997             self._add_server_with_share(server_number=10, share_number=7)
1998             for i in xrange(4):
1999                 self._copy_share_to_server(i, 2)
2000             return self.g.clients[0]
2001         d.addCallback(_server_setup)
2002         d.addCallback(lambda client:
2003             client.upload(upload.Data("data" * 10000, convergence="")))
2004         d.addCallback(lambda ign:
2005             self.failUnless(self._has_happy_share_distribution()))
2006         return d
2007
2008
2009     def test_server_selector_bucket_abort(self):
2010         # If server selection for an upload fails due to an unhappy
2011         # layout, the server selection process should abort the buckets it
2012         # allocates before failing, so that the space can be re-used.
2013         self.basedir = self.mktemp()
2014         self.set_up_grid(num_servers=5)
2015
2016         # Try to upload a file with happy=7, which is unsatisfiable with
2017         # the current grid. This will fail, but should not take up any
2018         # space on the storage servers after it fails.
2019         client = self.g.clients[0]
2020         client.encoding_params['happy'] = 7
2021         d = defer.succeed(None)
2022         d.addCallback(lambda ignored:
2023             self.shouldFail(UploadUnhappinessError,
2024                             "test_server_selection_bucket_abort",
2025                             "",
2026                             client.upload, upload.Data("data" * 10000,
2027                                                        convergence="")))
2028         # wait for the abort messages to get there.
2029         def _turn_barrier(res):
2030             return fireEventually(res)
2031         d.addCallback(_turn_barrier)
2032         def _then(ignored):
2033             for server in self.g.servers_by_number.values():
2034                 self.failUnlessEqual(server.allocated_size(), 0)
2035         d.addCallback(_then)
2036         return d
2037
2038
2039     def test_encoder_bucket_abort(self):
2040         # If enough servers die in the process of encoding and uploading
2041         # a file to make the layout unhappy, we should cancel the
2042         # newly-allocated buckets before dying.
2043         self.basedir = self.mktemp()
2044         self.set_up_grid(num_servers=4)
2045
2046         client = self.g.clients[0]
2047         client.encoding_params['happy'] = 7
2048
2049         d = defer.succeed(None)
2050         d.addCallback(lambda ignored:
2051             self.shouldFail(UploadUnhappinessError,
2052                             "test_encoder_bucket_abort",
2053                             "",
2054                             self._do_upload_with_broken_servers, 1))
2055         def _turn_barrier(res):
2056             return fireEventually(res)
2057         d.addCallback(_turn_barrier)
2058         def _then(ignored):
2059             for server in self.g.servers_by_number.values():
2060                 self.failUnlessEqual(server.allocated_size(), 0)
2061         d.addCallback(_then)
2062         return d
2063
2064
2065     def _set_up_nodes_extra_config(self, clientdir):
2066         cfgfn = os.path.join(clientdir, "tahoe.cfg")
2067         oldcfg = open(cfgfn, "r").read()
2068         f = open(cfgfn, "wt")
2069         f.write(oldcfg)
2070         f.write("\n")
2071         f.write("[client]\n")
2072         f.write("shares.needed = 7\n")
2073         f.write("shares.total = 12\n")
2074         f.write("\n")
2075         f.close()
2076         return None
2077
2078 # TODO:
2079 #  upload with exactly 75 servers (shares_of_happiness)
2080 #  have a download fail
2081 #  cancel a download (need to implement more cancel stuff)
2082
2083 # from test_encode:
2084 # NoNetworkGrid, upload part of ciphertext, kill server, continue upload
2085 # check with Kevan, they want to live in test_upload, existing tests might cover
2086 #     def test_lost_one_shareholder(self): # these are upload-side tests
2087 #     def test_lost_one_shareholder_early(self):
2088 #     def test_lost_many_shareholders(self):
2089 #     def test_lost_all_shareholders(self):