]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
d3b80aa069a447d4c9fd782ccd4cb58115b018d3
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1 import os, shutil
2 from cStringIO import StringIO
3 from twisted.trial import unittest
4 from twisted.python.failure import Failure
5 from twisted.internet import defer
6 from foolscap.api import fireEventually
7
8 import allmydata # for __full_version__
9 from allmydata import uri, monitor, client
10 from allmydata.immutable import upload, encode
11 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
12 from allmydata.util.assertutil import precondition
13 from allmydata.util.deferredutil import DeferredListShouldSucceed
14 from allmydata.test.no_network import GridTestMixin
15 from allmydata.test.common_util import ShouldFailMixin
16 from allmydata.util.happinessutil import servers_of_happiness, \
17                                          shares_by_server, merge_peers
18 from allmydata.storage_client import StorageFarmBroker
19 from allmydata.storage.server import storage_index_to_dir
20
21 MiB = 1024*1024
22
23 def extract_uri(results):
24     return results.uri
25
26 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
27 # have been due to an earlier test ERROR'ing out due to timeout, which seems
28 # to screw up subsequent tests.
29 timeout = 960
30
31 class Uploadable(unittest.TestCase):
32     def shouldEqual(self, data, expected):
33         self.failUnless(isinstance(data, list))
34         for e in data:
35             self.failUnless(isinstance(e, str))
36         s = "".join(data)
37         self.failUnlessEqual(s, expected)
38
39     def test_filehandle_random_key(self):
40         return self._test_filehandle(convergence=None)
41
42     def test_filehandle_convergent_encryption(self):
43         return self._test_filehandle(convergence="some convergence string")
44
45     def _test_filehandle(self, convergence):
46         s = StringIO("a"*41)
47         u = upload.FileHandle(s, convergence=convergence)
48         d = u.get_size()
49         d.addCallback(self.failUnlessEqual, 41)
50         d.addCallback(lambda res: u.read(1))
51         d.addCallback(self.shouldEqual, "a")
52         d.addCallback(lambda res: u.read(80))
53         d.addCallback(self.shouldEqual, "a"*40)
54         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
55         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
56         return d
57
58     def test_filename(self):
59         basedir = "upload/Uploadable/test_filename"
60         os.makedirs(basedir)
61         fn = os.path.join(basedir, "file")
62         f = open(fn, "w")
63         f.write("a"*41)
64         f.close()
65         u = upload.FileName(fn, convergence=None)
66         d = u.get_size()
67         d.addCallback(self.failUnlessEqual, 41)
68         d.addCallback(lambda res: u.read(1))
69         d.addCallback(self.shouldEqual, "a")
70         d.addCallback(lambda res: u.read(80))
71         d.addCallback(self.shouldEqual, "a"*40)
72         d.addCallback(lambda res: u.close())
73         return d
74
75     def test_data(self):
76         s = "a"*41
77         u = upload.Data(s, convergence=None)
78         d = u.get_size()
79         d.addCallback(self.failUnlessEqual, 41)
80         d.addCallback(lambda res: u.read(1))
81         d.addCallback(self.shouldEqual, "a")
82         d.addCallback(lambda res: u.read(80))
83         d.addCallback(self.shouldEqual, "a"*40)
84         d.addCallback(lambda res: u.close())
85         return d
86
87 class ServerError(Exception):
88     pass
89
90 class SetDEPMixin:
91     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
92         p = {"k": k,
93              "happy": happy,
94              "n": n,
95              "max_segment_size": max_segsize,
96              }
97         self.node.DEFAULT_ENCODING_PARAMETERS = p
98
99 class FakeStorageServer:
100     def __init__(self, mode):
101         self.mode = mode
102         self.allocated = []
103         self.queries = 0
104         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
105                          { "maximum-immutable-share-size": 2**32 },
106                          "application-version": str(allmydata.__full_version__),
107                          }
108         if mode == "small":
109             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
110                              { "maximum-immutable-share-size": 10 },
111                              "application-version": str(allmydata.__full_version__),
112                              }
113
114
115     def callRemote(self, methname, *args, **kwargs):
116         def _call():
117             meth = getattr(self, methname)
118             return meth(*args, **kwargs)
119         d = fireEventually()
120         d.addCallback(lambda res: _call())
121         return d
122
123     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
124                          sharenums, share_size, canary):
125         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
126         if self.mode == "first-fail":
127             if self.queries == 0:
128                 raise ServerError
129         if self.mode == "second-fail":
130             if self.queries == 1:
131                 raise ServerError
132         self.queries += 1
133         if self.mode == "full":
134             return (set(), {},)
135         elif self.mode == "already got them":
136             return (set(sharenums), {},)
137         else:
138             for shnum in sharenums:
139                 self.allocated.append( (storage_index, shnum) )
140             return (set(),
141                     dict([( shnum, FakeBucketWriter(share_size) )
142                           for shnum in sharenums]),
143                     )
144
145 class FakeBucketWriter:
146     # a diagnostic version of storageserver.BucketWriter
147     def __init__(self, size):
148         self.data = StringIO()
149         self.closed = False
150         self._size = size
151
152     def callRemote(self, methname, *args, **kwargs):
153         def _call():
154             meth = getattr(self, "remote_" + methname)
155             return meth(*args, **kwargs)
156         d = fireEventually()
157         d.addCallback(lambda res: _call())
158         return d
159
160
161     def callRemoteOnly(self, methname, *args, **kwargs):
162         d = self.callRemote(methname, *args, **kwargs)
163         del d # callRemoteOnly ignores this
164         return None
165
166
167     def remote_write(self, offset, data):
168         precondition(not self.closed)
169         precondition(offset >= 0)
170         precondition(offset+len(data) <= self._size,
171                      "offset=%d + data=%d > size=%d" %
172                      (offset, len(data), self._size))
173         self.data.seek(offset)
174         self.data.write(data)
175
176     def remote_close(self):
177         precondition(not self.closed)
178         self.closed = True
179
180     def remote_abort(self):
181         pass
182
183 class FakeClient:
184     DEFAULT_ENCODING_PARAMETERS = {"k":25,
185                                    "happy": 25,
186                                    "n": 100,
187                                    "max_segment_size": 1*MiB,
188                                    }
189     def __init__(self, mode="good", num_servers=50):
190         self.num_servers = num_servers
191         if type(mode) is str:
192             mode = dict([i,mode] for i in range(num_servers))
193         peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
194                   for fakeid in range(self.num_servers) ]
195         self.storage_broker = StorageFarmBroker(None, permute_peers=True)
196         for (serverid, server) in peers:
197             self.storage_broker.test_add_server(serverid, server)
198         self.last_peers = [p[1] for p in peers]
199
200     def log(self, *args, **kwargs):
201         pass
202     def get_encoding_parameters(self):
203         return self.DEFAULT_ENCODING_PARAMETERS
204     def get_storage_broker(self):
205         return self.storage_broker
206     _secret_holder = client.SecretHolder("lease secret", "convergence secret")
207
208 class GotTooFarError(Exception):
209     pass
210
211 class GiganticUploadable(upload.FileHandle):
212     def __init__(self, size):
213         self._size = size
214         self._fp = 0
215
216     def get_encryption_key(self):
217         return defer.succeed("\x00" * 16)
218     def get_size(self):
219         return defer.succeed(self._size)
220     def read(self, length):
221         left = self._size - self._fp
222         length = min(left, length)
223         self._fp += length
224         if self._fp > 1000000:
225             # terminate the test early.
226             raise GotTooFarError("we shouldn't be allowed to get this far")
227         return defer.succeed(["\x00" * length])
228     def close(self):
229         pass
230
231 DATA = """
232 Once upon a time, there was a beautiful princess named Buttercup. She lived
233 in a magical land where every file was stored securely among millions of
234 machines, and nobody ever worried about their data being lost ever again.
235 The End.
236 """
237 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
238
239 SIZE_ZERO = 0
240 SIZE_SMALL = 16
241 SIZE_LARGE = len(DATA)
242
243 def upload_data(uploader, data):
244     u = upload.Data(data, convergence=None)
245     return uploader.upload(u)
246 def upload_filename(uploader, filename):
247     u = upload.FileName(filename, convergence=None)
248     return uploader.upload(u)
249 def upload_filehandle(uploader, fh):
250     u = upload.FileHandle(fh, convergence=None)
251     return uploader.upload(u)
252
253 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
254     def setUp(self):
255         self.node = FakeClient(mode="good")
256         self.u = upload.Uploader()
257         self.u.running = True
258         self.u.parent = self.node
259
260     def _check_small(self, newuri, size):
261         u = uri.from_string(newuri)
262         self.failUnless(isinstance(u, uri.LiteralFileURI))
263         self.failUnlessEqual(len(u.data), size)
264
265     def _check_large(self, newuri, size):
266         u = uri.from_string(newuri)
267         self.failUnless(isinstance(u, uri.CHKFileURI))
268         self.failUnless(isinstance(u.get_storage_index(), str))
269         self.failUnlessEqual(len(u.get_storage_index()), 16)
270         self.failUnless(isinstance(u.key, str))
271         self.failUnlessEqual(len(u.key), 16)
272         self.failUnlessEqual(u.size, size)
273
274     def get_data(self, size):
275         return DATA[:size]
276
277     def test_too_large(self):
278         # we've removed the 4GiB share size limit (see ticket #346 for
279         # details), but still have an 8-byte field, so the limit is now
280         # 2**64, so make sure we reject files larger than that.
281         k = 3; happy = 7; n = 10
282         self.set_encoding_parameters(k, happy, n)
283         big = k*(2**64)
284         data1 = GiganticUploadable(big)
285         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
286                             "This file is too large to be uploaded (data_size)",
287                             self.u.upload, data1)
288         data2 = GiganticUploadable(big-3)
289         d.addCallback(lambda res:
290                       self.shouldFail(FileTooLargeError,
291                                       "test_too_large-data2",
292                                       "This file is too large to be uploaded (offsets)",
293                                       self.u.upload, data2))
294         # I don't know where the actual limit is.. it depends upon how large
295         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
296         return d
297
298     def test_data_zero(self):
299         data = self.get_data(SIZE_ZERO)
300         d = upload_data(self.u, data)
301         d.addCallback(extract_uri)
302         d.addCallback(self._check_small, SIZE_ZERO)
303         return d
304
305     def test_data_small(self):
306         data = self.get_data(SIZE_SMALL)
307         d = upload_data(self.u, data)
308         d.addCallback(extract_uri)
309         d.addCallback(self._check_small, SIZE_SMALL)
310         return d
311
312     def test_data_large(self):
313         data = self.get_data(SIZE_LARGE)
314         d = upload_data(self.u, data)
315         d.addCallback(extract_uri)
316         d.addCallback(self._check_large, SIZE_LARGE)
317         return d
318
319     def test_data_large_odd_segments(self):
320         data = self.get_data(SIZE_LARGE)
321         segsize = int(SIZE_LARGE / 2.5)
322         # we want 3 segments, since that's not a power of two
323         self.set_encoding_parameters(25, 25, 100, segsize)
324         d = upload_data(self.u, data)
325         d.addCallback(extract_uri)
326         d.addCallback(self._check_large, SIZE_LARGE)
327         return d
328
329     def test_filehandle_zero(self):
330         data = self.get_data(SIZE_ZERO)
331         d = upload_filehandle(self.u, StringIO(data))
332         d.addCallback(extract_uri)
333         d.addCallback(self._check_small, SIZE_ZERO)
334         return d
335
336     def test_filehandle_small(self):
337         data = self.get_data(SIZE_SMALL)
338         d = upload_filehandle(self.u, StringIO(data))
339         d.addCallback(extract_uri)
340         d.addCallback(self._check_small, SIZE_SMALL)
341         return d
342
343     def test_filehandle_large(self):
344         data = self.get_data(SIZE_LARGE)
345         d = upload_filehandle(self.u, StringIO(data))
346         d.addCallback(extract_uri)
347         d.addCallback(self._check_large, SIZE_LARGE)
348         return d
349
350     def test_filename_zero(self):
351         fn = "Uploader-test_filename_zero.data"
352         f = open(fn, "wb")
353         data = self.get_data(SIZE_ZERO)
354         f.write(data)
355         f.close()
356         d = upload_filename(self.u, fn)
357         d.addCallback(extract_uri)
358         d.addCallback(self._check_small, SIZE_ZERO)
359         return d
360
361     def test_filename_small(self):
362         fn = "Uploader-test_filename_small.data"
363         f = open(fn, "wb")
364         data = self.get_data(SIZE_SMALL)
365         f.write(data)
366         f.close()
367         d = upload_filename(self.u, fn)
368         d.addCallback(extract_uri)
369         d.addCallback(self._check_small, SIZE_SMALL)
370         return d
371
372     def test_filename_large(self):
373         fn = "Uploader-test_filename_large.data"
374         f = open(fn, "wb")
375         data = self.get_data(SIZE_LARGE)
376         f.write(data)
377         f.close()
378         d = upload_filename(self.u, fn)
379         d.addCallback(extract_uri)
380         d.addCallback(self._check_large, SIZE_LARGE)
381         return d
382
383 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
384     def make_node(self, mode, num_servers=10):
385         self.node = FakeClient(mode, num_servers)
386         self.u = upload.Uploader()
387         self.u.running = True
388         self.u.parent = self.node
389
390     def _check_large(self, newuri, size):
391         u = uri.from_string(newuri)
392         self.failUnless(isinstance(u, uri.CHKFileURI))
393         self.failUnless(isinstance(u.get_storage_index(), str))
394         self.failUnlessEqual(len(u.get_storage_index()), 16)
395         self.failUnless(isinstance(u.key, str))
396         self.failUnlessEqual(len(u.key), 16)
397         self.failUnlessEqual(u.size, size)
398
399     def test_first_error(self):
400         mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
401         self.make_node(mode)
402         self.set_encoding_parameters(k=25, happy=1, n=50)
403         d = upload_data(self.u, DATA)
404         d.addCallback(extract_uri)
405         d.addCallback(self._check_large, SIZE_LARGE)
406         return d
407
408     def test_first_error_all(self):
409         self.make_node("first-fail")
410         d = self.shouldFail(UploadUnhappinessError, "first_error_all",
411                             "peer selection failed",
412                             upload_data, self.u, DATA)
413         def _check((f,)):
414             self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
415             # there should also be a 'last failure was' message
416             self.failUnlessIn("ServerError", str(f.value))
417         d.addCallback(_check)
418         return d
419
420     def test_second_error(self):
421         # we want to make sure we make it to a third pass. This means that
422         # the first pass was insufficient to place all shares, and at least
423         # one of second pass servers (other than the last one) accepted a
424         # share (so we'll believe that a third pass will be useful). (if
425         # everyone but the last server throws an error, then we'll send all
426         # the remaining shares to the last server at the end of the second
427         # pass, and if that succeeds, we won't make it to a third pass).
428         #
429         # we can achieve this 97.5% of the time by using 40 servers, having
430         # 39 of them fail on the second request, leaving only one to succeed
431         # on the second request. (we need to keep the number of servers low
432         # enough to ensure a second pass with 100 shares).
433         mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
434         self.make_node(mode, 40)
435         d = upload_data(self.u, DATA)
436         d.addCallback(extract_uri)
437         d.addCallback(self._check_large, SIZE_LARGE)
438         return d
439
440     def test_second_error_all(self):
441         self.make_node("second-fail")
442         d = self.shouldFail(UploadUnhappinessError, "second_error_all",
443                             "peer selection failed",
444                             upload_data, self.u, DATA)
445         def _check((f,)):
446             self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
447             # there should also be a 'last failure was' message
448             self.failUnlessIn("ServerError", str(f.value))
449         d.addCallback(_check)
450         return d
451
452 class FullServer(unittest.TestCase):
453     def setUp(self):
454         self.node = FakeClient(mode="full")
455         self.u = upload.Uploader()
456         self.u.running = True
457         self.u.parent = self.node
458
459     def _should_fail(self, f):
460         self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
461
462     def test_data_large(self):
463         data = DATA
464         d = upload_data(self.u, data)
465         d.addBoth(self._should_fail)
466         return d
467
468 class PeerSelection(unittest.TestCase):
469
470     def make_client(self, num_servers=50):
471         self.node = FakeClient(mode="good", num_servers=num_servers)
472         self.u = upload.Uploader()
473         self.u.running = True
474         self.u.parent = self.node
475
476     def get_data(self, size):
477         return DATA[:size]
478
479     def _check_large(self, newuri, size):
480         u = uri.from_string(newuri)
481         self.failUnless(isinstance(u, uri.CHKFileURI))
482         self.failUnless(isinstance(u.get_storage_index(), str))
483         self.failUnlessEqual(len(u.get_storage_index()), 16)
484         self.failUnless(isinstance(u.key, str))
485         self.failUnlessEqual(len(u.key), 16)
486         self.failUnlessEqual(u.size, size)
487
488     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
489         p = {"k": k,
490              "happy": happy,
491              "n": n,
492              "max_segment_size": max_segsize,
493              }
494         self.node.DEFAULT_ENCODING_PARAMETERS = p
495
496     def test_one_each(self):
497         # if we have 50 shares, and there are 50 peers, and they all accept a
498         # share, we should get exactly one share per peer
499
500         self.make_client()
501         data = self.get_data(SIZE_LARGE)
502         self.set_encoding_parameters(25, 30, 50)
503         d = upload_data(self.u, data)
504         d.addCallback(extract_uri)
505         d.addCallback(self._check_large, SIZE_LARGE)
506         def _check(res):
507             for p in self.node.last_peers:
508                 allocated = p.allocated
509                 self.failUnlessEqual(len(allocated), 1)
510                 self.failUnlessEqual(p.queries, 1)
511         d.addCallback(_check)
512         return d
513
514     def test_two_each(self):
515         # if we have 100 shares, and there are 50 peers, and they all accept
516         # all shares, we should get exactly two shares per peer
517
518         self.make_client()
519         data = self.get_data(SIZE_LARGE)
520         # if there are 50 peers, then happy needs to be <= 50
521         self.set_encoding_parameters(50, 50, 100)
522         d = upload_data(self.u, data)
523         d.addCallback(extract_uri)
524         d.addCallback(self._check_large, SIZE_LARGE)
525         def _check(res):
526             for p in self.node.last_peers:
527                 allocated = p.allocated
528                 self.failUnlessEqual(len(allocated), 2)
529                 self.failUnlessEqual(p.queries, 2)
530         d.addCallback(_check)
531         return d
532
533     def test_one_each_plus_one_extra(self):
534         # if we have 51 shares, and there are 50 peers, then one peer gets
535         # two shares and the rest get just one
536
537         self.make_client()
538         data = self.get_data(SIZE_LARGE)
539         self.set_encoding_parameters(24, 41, 51)
540         d = upload_data(self.u, data)
541         d.addCallback(extract_uri)
542         d.addCallback(self._check_large, SIZE_LARGE)
543         def _check(res):
544             got_one = []
545             got_two = []
546             for p in self.node.last_peers:
547                 allocated = p.allocated
548                 self.failUnless(len(allocated) in (1,2), len(allocated))
549                 if len(allocated) == 1:
550                     self.failUnlessEqual(p.queries, 1)
551                     got_one.append(p)
552                 else:
553                     self.failUnlessEqual(p.queries, 2)
554                     got_two.append(p)
555             self.failUnlessEqual(len(got_one), 49)
556             self.failUnlessEqual(len(got_two), 1)
557         d.addCallback(_check)
558         return d
559
560     def test_four_each(self):
561         # if we have 200 shares, and there are 50 peers, then each peer gets
562         # 4 shares. The design goal is to accomplish this with only two
563         # queries per peer.
564
565         self.make_client()
566         data = self.get_data(SIZE_LARGE)
567         # if there are 50 peers, then happy should be no more than 50 if
568         # we want this to work.
569         self.set_encoding_parameters(100, 50, 200)
570         d = upload_data(self.u, data)
571         d.addCallback(extract_uri)
572         d.addCallback(self._check_large, SIZE_LARGE)
573         def _check(res):
574             for p in self.node.last_peers:
575                 allocated = p.allocated
576                 self.failUnlessEqual(len(allocated), 4)
577                 self.failUnlessEqual(p.queries, 2)
578         d.addCallback(_check)
579         return d
580
581     def test_three_of_ten(self):
582         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
583         # 4+4+2
584
585         self.make_client(3)
586         data = self.get_data(SIZE_LARGE)
587         self.set_encoding_parameters(3, 3, 10)
588         d = upload_data(self.u, data)
589         d.addCallback(extract_uri)
590         d.addCallback(self._check_large, SIZE_LARGE)
591         def _check(res):
592             counts = {}
593             for p in self.node.last_peers:
594                 allocated = p.allocated
595                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
596             histogram = [counts.get(i, 0) for i in range(5)]
597             self.failUnlessEqual(histogram, [0,0,0,2,1])
598         d.addCallback(_check)
599         return d
600
601     def test_some_big_some_small(self):
602         # 10 shares, 20 servers, but half the servers don't support a
603         # share-size large enough for our file
604         mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
605         self.node = FakeClient(mode, num_servers=20)
606         self.u = upload.Uploader()
607         self.u.running = True
608         self.u.parent = self.node
609
610         data = self.get_data(SIZE_LARGE)
611         self.set_encoding_parameters(3, 5, 10)
612         d = upload_data(self.u, data)
613         d.addCallback(extract_uri)
614         d.addCallback(self._check_large, SIZE_LARGE)
615         def _check(res):
616             # we should have put one share each on the big peers, and zero
617             # shares on the small peers
618             total_allocated = 0
619             for p in self.node.last_peers:
620                 if p.mode == "good":
621                     self.failUnlessEqual(len(p.allocated), 1)
622                 elif p.mode == "small":
623                     self.failUnlessEqual(len(p.allocated), 0)
624                 total_allocated += len(p.allocated)
625             self.failUnlessEqual(total_allocated, 10)
626         d.addCallback(_check)
627         return d
628
629
630 class StorageIndex(unittest.TestCase):
631     def test_params_must_matter(self):
632         DATA = "I am some data"
633         u = upload.Data(DATA, convergence="")
634         eu = upload.EncryptAnUploadable(u)
635         d1 = eu.get_storage_index()
636
637         # CHK means the same data should encrypt the same way
638         u = upload.Data(DATA, convergence="")
639         eu = upload.EncryptAnUploadable(u)
640         d1a = eu.get_storage_index()
641
642         # but if we use a different convergence string it should be different
643         u = upload.Data(DATA, convergence="wheee!")
644         eu = upload.EncryptAnUploadable(u)
645         d1salt1 = eu.get_storage_index()
646
647         # and if we add yet a different convergence it should be different again
648         u = upload.Data(DATA, convergence="NOT wheee!")
649         eu = upload.EncryptAnUploadable(u)
650         d1salt2 = eu.get_storage_index()
651
652         # and if we use the first string again it should be the same as last time
653         u = upload.Data(DATA, convergence="wheee!")
654         eu = upload.EncryptAnUploadable(u)
655         d1salt1a = eu.get_storage_index()
656
657         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
658         u = upload.Data(DATA, convergence="")
659         u.encoding_param_k = u.default_encoding_param_k + 1
660         eu = upload.EncryptAnUploadable(u)
661         d2 = eu.get_storage_index()
662
663         # and if we use a random key, it should be different than the CHK
664         u = upload.Data(DATA, convergence=None)
665         eu = upload.EncryptAnUploadable(u)
666         d3 = eu.get_storage_index()
667         # and different from another instance
668         u = upload.Data(DATA, convergence=None)
669         eu = upload.EncryptAnUploadable(u)
670         d4 = eu.get_storage_index()
671
672         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
673         def _done(res):
674             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
675             self.failUnlessEqual(si1, si1a)
676             self.failIfEqual(si1, si2)
677             self.failIfEqual(si1, si3)
678             self.failIfEqual(si1, si4)
679             self.failIfEqual(si3, si4)
680             self.failIfEqual(si1salt1, si1)
681             self.failIfEqual(si1salt1, si1salt2)
682             self.failIfEqual(si1salt2, si1)
683             self.failUnlessEqual(si1salt1, si1salt1a)
684         d.addCallback(_done)
685         return d
686
687 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
688     ShouldFailMixin):
689     def _do_upload_with_broken_servers(self, servers_to_break):
690         """
691         I act like a normal upload, but before I send the results of
692         Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
693         PeerTrackers in the used_peers part of the return result.
694         """
695         assert self.g, "I tried to find a grid at self.g, but failed"
696         broker = self.g.clients[0].storage_broker
697         sh     = self.g.clients[0]._secret_holder
698         data = upload.Data("data" * 10000, convergence="")
699         data.encoding_param_k = 3
700         data.encoding_param_happy = 4
701         data.encoding_param_n = 10
702         uploadable = upload.EncryptAnUploadable(data)
703         encoder = encode.Encoder()
704         encoder.set_encrypted_uploadable(uploadable)
705         status = upload.UploadStatus()
706         selector = upload.Tahoe2PeerSelector("dglev", "test", status)
707         storage_index = encoder.get_param("storage_index")
708         share_size = encoder.get_param("share_size")
709         block_size = encoder.get_param("block_size")
710         num_segments = encoder.get_param("num_segments")
711         d = selector.get_shareholders(broker, sh, storage_index,
712                                       share_size, block_size, num_segments,
713                                       10, 3, 4)
714         def _have_shareholders((used_peers, already_peers)):
715             assert servers_to_break <= len(used_peers)
716             for index in xrange(servers_to_break):
717                 server = list(used_peers)[index]
718                 for share in server.buckets.keys():
719                     server.buckets[share].abort()
720             buckets = {}
721             servermap = already_peers.copy()
722             for peer in used_peers:
723                 buckets.update(peer.buckets)
724                 for bucket in peer.buckets:
725                     servermap.setdefault(bucket, set()).add(peer.peerid)
726             encoder.set_shareholders(buckets, servermap)
727             d = encoder.start()
728             return d
729         d.addCallback(_have_shareholders)
730         return d
731
732
733     def _add_server(self, server_number, readonly=False):
734         assert self.g, "I tried to find a grid at self.g, but failed"
735         ss = self.g.make_server(server_number, readonly)
736         self.g.add_server(server_number, ss)
737
738
739     def _add_server_with_share(self, server_number, share_number=None,
740                                readonly=False):
741         self._add_server(server_number, readonly)
742         if share_number is not None:
743             self._copy_share_to_server(share_number, server_number)
744
745
746     def _copy_share_to_server(self, share_number, server_number):
747         ss = self.g.servers_by_number[server_number]
748         # Copy share i from the directory associated with the first
749         # storage server to the directory associated with this one.
750         assert self.g, "I tried to find a grid at self.g, but failed"
751         assert self.shares, "I tried to find shares at self.shares, but failed"
752         old_share_location = self.shares[share_number][2]
753         new_share_location = os.path.join(ss.storedir, "shares")
754         si = uri.from_string(self.uri).get_storage_index()
755         new_share_location = os.path.join(new_share_location,
756                                           storage_index_to_dir(si))
757         if not os.path.exists(new_share_location):
758             os.makedirs(new_share_location)
759         new_share_location = os.path.join(new_share_location,
760                                           str(share_number))
761         if old_share_location != new_share_location:
762             shutil.copy(old_share_location, new_share_location)
763         shares = self.find_shares(self.uri)
764         # Make sure that the storage server has the share.
765         self.failUnless((share_number, ss.my_nodeid, new_share_location)
766                         in shares)
767
768     def _setup_grid(self):
769         """
770         I set up a NoNetworkGrid with a single server and client.
771         """
772         self.set_up_grid(num_clients=1, num_servers=1)
773
774     def _setup_and_upload(self, **kwargs):
775         """
776         I set up a NoNetworkGrid with a single server and client,
777         upload a file to it, store its uri in self.uri, and store its
778         sharedata in self.shares.
779         """
780         self._setup_grid()
781         client = self.g.clients[0]
782         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
783         if "n" in kwargs and "k" in kwargs:
784             client.DEFAULT_ENCODING_PARAMETERS['k'] = kwargs['k']
785             client.DEFAULT_ENCODING_PARAMETERS['n'] = kwargs['n']
786         data = upload.Data("data" * 10000, convergence="")
787         self.data = data
788         d = client.upload(data)
789         def _store_uri(ur):
790             self.uri = ur.uri
791         d.addCallback(_store_uri)
792         d.addCallback(lambda ign:
793             self.find_shares(self.uri))
794         def _store_shares(shares):
795             self.shares = shares
796         d.addCallback(_store_shares)
797         return d
798
799
800     def test_configure_parameters(self):
801         self.basedir = self.mktemp()
802         hooks = {0: self._set_up_nodes_extra_config}
803         self.set_up_grid(client_config_hooks=hooks)
804         c0 = self.g.clients[0]
805
806         DATA = "data" * 100
807         u = upload.Data(DATA, convergence="")
808         d = c0.upload(u)
809         d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
810         m = monitor.Monitor()
811         d.addCallback(lambda fn: fn.check(m))
812         def _check(cr):
813             data = cr.get_data()
814             self.failUnlessEqual(data["count-shares-needed"], 7)
815             self.failUnlessEqual(data["count-shares-expected"], 12)
816         d.addCallback(_check)
817         return d
818
819
820     def _setUp(self, ns):
821         # Used by test_happy_semantics and test_preexisting_share_behavior
822         # to set up the grid.
823         self.node = FakeClient(mode="good", num_servers=ns)
824         self.u = upload.Uploader()
825         self.u.running = True
826         self.u.parent = self.node
827
828
829     def test_happy_semantics(self):
830         self._setUp(2)
831         DATA = upload.Data("kittens" * 10000, convergence="")
832         # These parameters are unsatisfiable with only 2 servers.
833         self.set_encoding_parameters(k=3, happy=5, n=10)
834         d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
835                             "shares could be placed or found on only 2 "
836                             "server(s). We were asked to place shares on "
837                             "at least 5 server(s) such that any 3 of them "
838                             "have enough shares to recover the file",
839                             self.u.upload, DATA)
840         # Let's reset the client to have 10 servers
841         d.addCallback(lambda ign:
842             self._setUp(10))
843         # These parameters are satisfiable with 10 servers.
844         d.addCallback(lambda ign:
845             self.set_encoding_parameters(k=3, happy=5, n=10))
846         d.addCallback(lambda ign:
847             self.u.upload(DATA))
848         # Let's reset the client to have 7 servers
849         # (this is less than n, but more than h)
850         d.addCallback(lambda ign:
851             self._setUp(7))
852         # These parameters are satisfiable with 7 servers.
853         d.addCallback(lambda ign:
854             self.set_encoding_parameters(k=3, happy=5, n=10))
855         d.addCallback(lambda ign:
856             self.u.upload(DATA))
857         return d
858
859     def test_aborted_shares(self):
860         self.basedir = "upload/EncodingParameters/aborted_shares"
861         self.set_up_grid(num_servers=4)
862         c = self.g.clients[0]
863         DATA = upload.Data(100* "kittens", convergence="")
864         # These parameters are unsatisfiable with only 4 servers, but should
865         # work with 5, as long as the original 4 are not stuck in the open
866         # BucketWriter state (open() but not
867         parms = {"k":2, "happy":5, "n":5, "max_segment_size": 1*MiB}
868         c.DEFAULT_ENCODING_PARAMETERS = parms
869         d = self.shouldFail(UploadUnhappinessError, "test_aborted_shares",
870                             "shares could be placed on only 4 "
871                             "server(s) such that any 2 of them have enough "
872                             "shares to recover the file, but we were asked "
873                             "to place shares on at least 5 such servers",
874                             c.upload, DATA)
875         # now add the 5th server
876         d.addCallback(lambda ign: self._add_server(4, False))
877         # and this time the upload ought to succeed
878         d.addCallback(lambda ign: c.upload(DATA))
879         return d
880
881
882     def test_problem_layout_comment_52(self):
883         def _basedir():
884             self.basedir = self.mktemp()
885         _basedir()
886         # This scenario is at
887         # http://allmydata.org/trac/tahoe/ticket/778#comment:52
888         #
889         # The scenario in comment:52 proposes that we have a layout
890         # like:
891         # server 0: shares 1 - 9
892         # server 1: share 0, read-only
893         # server 2: share 0, read-only
894         # server 3: share 0, read-only
895         # To get access to the shares, we will first upload to one
896         # server, which will then have shares 0 - 9. We'll then
897         # add three new servers, configure them to not accept any new
898         # shares, then write share 0 directly into the serverdir of each,
899         # and then remove share 0 from server 0 in the same way.
900         # Then each of servers 1 - 3 will report that they have share 0,
901         # and will not accept any new share, while server 0 will report that
902         # it has shares 1 - 9 and will accept new shares.
903         # We'll then set 'happy' = 4, and see that an upload fails
904         # (as it should)
905         d = self._setup_and_upload()
906         d.addCallback(lambda ign:
907             self._add_server_with_share(server_number=1, share_number=0,
908                                         readonly=True))
909         d.addCallback(lambda ign:
910             self._add_server_with_share(server_number=2, share_number=0,
911                                         readonly=True))
912         d.addCallback(lambda ign:
913             self._add_server_with_share(server_number=3, share_number=0,
914                                         readonly=True))
915         # Remove the first share from server 0.
916         def _remove_share_0_from_server_0():
917             share_location = self.shares[0][2]
918             os.remove(share_location)
919         d.addCallback(lambda ign:
920             _remove_share_0_from_server_0())
921         # Set happy = 4 in the client.
922         def _prepare():
923             client = self.g.clients[0]
924             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
925             return client
926         d.addCallback(lambda ign:
927             _prepare())
928         # Uploading data should fail
929         d.addCallback(lambda client:
930             self.shouldFail(UploadUnhappinessError,
931                             "test_problem_layout_comment_52_test_1",
932                             "shares could be placed or found on 4 server(s), "
933                             "but they are not spread out evenly enough to "
934                             "ensure that any 3 of these servers would have "
935                             "enough shares to recover the file. "
936                             "We were asked to place shares on at "
937                             "least 4 servers such that any 3 of them have "
938                             "enough shares to recover the file",
939                             client.upload, upload.Data("data" * 10000,
940                                                        convergence="")))
941
942         # Do comment:52, but like this:
943         # server 2: empty
944         # server 3: share 0, read-only
945         # server 1: share 0, read-only
946         # server 0: shares 0-9
947         d.addCallback(lambda ign:
948             _basedir())
949         d.addCallback(lambda ign:
950             self._setup_and_upload())
951         d.addCallback(lambda ign:
952             self._add_server(server_number=2))
953         d.addCallback(lambda ign:
954             self._add_server_with_share(server_number=3, share_number=0,
955                                         readonly=True))
956         d.addCallback(lambda ign:
957             self._add_server_with_share(server_number=1, share_number=0,
958                                         readonly=True))
959         def _prepare2():
960             client = self.g.clients[0]
961             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
962             return client
963         d.addCallback(lambda ign:
964             _prepare2())
965         d.addCallback(lambda client:
966             self.shouldFail(UploadUnhappinessError,
967                             "test_problem_layout_comment_52_test_2",
968                             "shares could be placed on only 3 server(s) such "
969                             "that any 3 of them have enough shares to recover "
970                             "the file, but we were asked to place shares on "
971                             "at least 4 such servers.",
972                             client.upload, upload.Data("data" * 10000,
973                                                        convergence="")))
974         return d
975
976
977     def test_problem_layout_comment_53(self):
978         # This scenario is at
979         # http://allmydata.org/trac/tahoe/ticket/778#comment:53
980         #
981         # Set up the grid to have one server
982         def _change_basedir(ign):
983             self.basedir = self.mktemp()
984         _change_basedir(None)
985         # We start by uploading all of the shares to one server.
986         # Next, we'll add three new servers to our NoNetworkGrid. We'll add
987         # one share from our initial upload to each of these.
988         # The counterintuitive ordering of the share numbers is to deal with
989         # the permuting of these servers -- distributing the shares this
990         # way ensures that the Tahoe2PeerSelector sees them in the order
991         # described below.
992         d = self._setup_and_upload()
993         d.addCallback(lambda ign:
994             self._add_server_with_share(server_number=1, share_number=2))
995         d.addCallback(lambda ign:
996             self._add_server_with_share(server_number=2, share_number=0))
997         d.addCallback(lambda ign:
998             self._add_server_with_share(server_number=3, share_number=1))
999         # So, we now have the following layout:
1000         # server 0: shares 0 - 9
1001         # server 1: share 2
1002         # server 2: share 0
1003         # server 3: share 1
1004         # We change the 'happy' parameter in the client to 4.
1005         # The Tahoe2PeerSelector will see the peers permuted as:
1006         # 2, 3, 1, 0
1007         # Ideally, a reupload of our original data should work.
1008         def _reset_encoding_parameters(ign, happy=4):
1009             client = self.g.clients[0]
1010             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1011             return client
1012         d.addCallback(_reset_encoding_parameters)
1013         d.addCallback(lambda client:
1014             client.upload(upload.Data("data" * 10000, convergence="")))
1015
1016
1017         # This scenario is basically comment:53, but changed so that the
1018         # Tahoe2PeerSelector sees the server with all of the shares before
1019         # any of the other servers.
1020         # The layout is:
1021         # server 2: shares 0 - 9
1022         # server 3: share 0
1023         # server 1: share 1
1024         # server 4: share 2
1025         # The Tahoe2PeerSelector sees the peers permuted as:
1026         # 2, 3, 1, 4
1027         # Note that server 0 has been replaced by server 4; this makes it
1028         # easier to ensure that the last server seen by Tahoe2PeerSelector
1029         # has only one share.
1030         d.addCallback(_change_basedir)
1031         d.addCallback(lambda ign:
1032             self._setup_and_upload())
1033         d.addCallback(lambda ign:
1034             self._add_server_with_share(server_number=2, share_number=0))
1035         d.addCallback(lambda ign:
1036             self._add_server_with_share(server_number=3, share_number=1))
1037         d.addCallback(lambda ign:
1038             self._add_server_with_share(server_number=1, share_number=2))
1039         # Copy all of the other shares to server number 2
1040         def _copy_shares(ign):
1041             for i in xrange(0, 10):
1042                 self._copy_share_to_server(i, 2)
1043         d.addCallback(_copy_shares)
1044         # Remove the first server, and add a placeholder with share 0
1045         d.addCallback(lambda ign:
1046             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1047         d.addCallback(lambda ign:
1048             self._add_server_with_share(server_number=4, share_number=0))
1049         # Now try uploading.
1050         d.addCallback(_reset_encoding_parameters)
1051         d.addCallback(lambda client:
1052             client.upload(upload.Data("data" * 10000, convergence="")))
1053
1054
1055         # Try the same thing, but with empty servers after the first one
1056         # We want to make sure that Tahoe2PeerSelector will redistribute
1057         # shares as necessary, not simply discover an existing layout.
1058         # The layout is:
1059         # server 2: shares 0 - 9
1060         # server 3: empty
1061         # server 1: empty
1062         # server 4: empty
1063         d.addCallback(_change_basedir)
1064         d.addCallback(lambda ign:
1065             self._setup_and_upload())
1066         d.addCallback(lambda ign:
1067             self._add_server(server_number=2))
1068         d.addCallback(lambda ign:
1069             self._add_server(server_number=3))
1070         d.addCallback(lambda ign:
1071             self._add_server(server_number=1))
1072         d.addCallback(lambda ign:
1073             self._add_server(server_number=4))
1074         d.addCallback(_copy_shares)
1075         d.addCallback(lambda ign:
1076             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1077         d.addCallback(_reset_encoding_parameters)
1078         d.addCallback(lambda client:
1079             client.upload(upload.Data("data" * 10000, convergence="")))
1080         # Make sure that only as many shares as necessary to satisfy
1081         # servers of happiness were pushed.
1082         d.addCallback(lambda results:
1083             self.failUnlessEqual(results.pushed_shares, 3))
1084         return d
1085
1086     def test_problem_layout_ticket1124(self):
1087         self.basedir = self.mktemp()
1088         d = self._setup_and_upload(k=2, n=4)
1089
1090         # server 0: shares 0, 1, 2, 3
1091         # server 1: shares 0, 3
1092         # server 2: share 1
1093         # server 3: share 2
1094         # With this layout, an upload should just be satisfied that the current distribution is good enough, right?
1095         def _setup(ign):
1096             self._add_server_with_share(server_number=0, share_number=None)
1097             self._add_server_with_share(server_number=1, share_number=0)
1098             self._add_server_with_share(server_number=2, share_number=1)
1099             self._add_server_with_share(server_number=3, share_number=2)
1100             # Copy shares
1101             self._copy_share_to_server(3, 1)
1102             client = self.g.clients[0]
1103             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1104             return client
1105
1106         d.addCallback(_setup)
1107         d.addCallback(lambda client:
1108             client.upload(upload.Data("data" * 10000, convergence="")))
1109         return d
1110     test_problem_layout_ticket1124.todo = "Fix this after 1.7.1 release."
1111
1112     def test_happiness_with_some_readonly_peers(self):
1113         # Try the following layout
1114         # server 2: shares 0-9
1115         # server 4: share 0, read-only
1116         # server 3: share 1, read-only
1117         # server 1: share 2, read-only
1118         self.basedir = self.mktemp()
1119         d = self._setup_and_upload()
1120         d.addCallback(lambda ign:
1121             self._add_server_with_share(server_number=2, share_number=0))
1122         d.addCallback(lambda ign:
1123             self._add_server_with_share(server_number=3, share_number=1,
1124                                         readonly=True))
1125         d.addCallback(lambda ign:
1126             self._add_server_with_share(server_number=1, share_number=2,
1127                                         readonly=True))
1128         # Copy all of the other shares to server number 2
1129         def _copy_shares(ign):
1130             for i in xrange(1, 10):
1131                 self._copy_share_to_server(i, 2)
1132         d.addCallback(_copy_shares)
1133         # Remove server 0, and add another in its place
1134         d.addCallback(lambda ign:
1135             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1136         d.addCallback(lambda ign:
1137             self._add_server_with_share(server_number=4, share_number=0,
1138                                         readonly=True))
1139         def _reset_encoding_parameters(ign, happy=4):
1140             client = self.g.clients[0]
1141             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1142             return client
1143         d.addCallback(_reset_encoding_parameters)
1144         d.addCallback(lambda client:
1145             client.upload(upload.Data("data" * 10000, convergence="")))
1146         return d
1147
1148
1149     def test_happiness_with_all_readonly_peers(self):
1150         # server 3: share 1, read-only
1151         # server 1: share 2, read-only
1152         # server 2: shares 0-9, read-only
1153         # server 4: share 0, read-only
1154         # The idea with this test is to make sure that the survey of
1155         # read-only peers doesn't undercount servers of happiness
1156         self.basedir = self.mktemp()
1157         d = self._setup_and_upload()
1158         d.addCallback(lambda ign:
1159             self._add_server_with_share(server_number=4, share_number=0,
1160                                         readonly=True))
1161         d.addCallback(lambda ign:
1162             self._add_server_with_share(server_number=3, share_number=1,
1163                                         readonly=True))
1164         d.addCallback(lambda ign:
1165             self._add_server_with_share(server_number=1, share_number=2,
1166                                         readonly=True))
1167         d.addCallback(lambda ign:
1168             self._add_server_with_share(server_number=2, share_number=0,
1169                                         readonly=True))
1170         def _copy_shares(ign):
1171             for i in xrange(1, 10):
1172                 self._copy_share_to_server(i, 2)
1173         d.addCallback(_copy_shares)
1174         d.addCallback(lambda ign:
1175             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1176         def _reset_encoding_parameters(ign, happy=4):
1177             client = self.g.clients[0]
1178             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1179             return client
1180         d.addCallback(_reset_encoding_parameters)
1181         d.addCallback(lambda client:
1182             client.upload(upload.Data("data" * 10000, convergence="")))
1183         return d
1184
1185
1186     def test_dropped_servers_in_encoder(self):
1187         # The Encoder does its own "servers_of_happiness" check if it
1188         # happens to lose a bucket during an upload (it assumes that
1189         # the layout presented to it satisfies "servers_of_happiness"
1190         # until a failure occurs)
1191         #
1192         # This test simulates an upload where servers break after peer
1193         # selection, but before they are written to.
1194         def _set_basedir(ign=None):
1195             self.basedir = self.mktemp()
1196         _set_basedir()
1197         d = self._setup_and_upload();
1198         # Add 5 servers
1199         def _do_server_setup(ign):
1200             self._add_server(server_number=1)
1201             self._add_server(server_number=2)
1202             self._add_server(server_number=3)
1203             self._add_server(server_number=4)
1204             self._add_server(server_number=5)
1205         d.addCallback(_do_server_setup)
1206         # remove the original server
1207         # (necessary to ensure that the Tahoe2PeerSelector will distribute
1208         #  all the shares)
1209         def _remove_server(ign):
1210             server = self.g.servers_by_number[0]
1211             self.g.remove_server(server.my_nodeid)
1212         d.addCallback(_remove_server)
1213         # This should succeed; we still have 4 servers, and the
1214         # happiness of the upload is 4.
1215         d.addCallback(lambda ign:
1216             self._do_upload_with_broken_servers(1))
1217         # Now, do the same thing over again, but drop 2 servers instead
1218         # of 1. This should fail, because servers_of_happiness is 4 and
1219         # we can't satisfy that.
1220         d.addCallback(_set_basedir)
1221         d.addCallback(lambda ign:
1222             self._setup_and_upload())
1223         d.addCallback(_do_server_setup)
1224         d.addCallback(_remove_server)
1225         d.addCallback(lambda ign:
1226             self.shouldFail(UploadUnhappinessError,
1227                             "test_dropped_servers_in_encoder",
1228                             "shares could be placed on only 3 server(s) "
1229                             "such that any 3 of them have enough shares to "
1230                             "recover the file, but we were asked to place "
1231                             "shares on at least 4",
1232                             self._do_upload_with_broken_servers, 2))
1233         # Now do the same thing over again, but make some of the servers
1234         # readonly, break some of the ones that aren't, and make sure that
1235         # happiness accounting is preserved.
1236         d.addCallback(_set_basedir)
1237         d.addCallback(lambda ign:
1238             self._setup_and_upload())
1239         def _do_server_setup_2(ign):
1240             self._add_server(1)
1241             self._add_server(2)
1242             self._add_server(3)
1243             self._add_server_with_share(4, 7, readonly=True)
1244             self._add_server_with_share(5, 8, readonly=True)
1245         d.addCallback(_do_server_setup_2)
1246         d.addCallback(_remove_server)
1247         d.addCallback(lambda ign:
1248             self._do_upload_with_broken_servers(1))
1249         d.addCallback(_set_basedir)
1250         d.addCallback(lambda ign:
1251             self._setup_and_upload())
1252         d.addCallback(_do_server_setup_2)
1253         d.addCallback(_remove_server)
1254         d.addCallback(lambda ign:
1255             self.shouldFail(UploadUnhappinessError,
1256                             "test_dropped_servers_in_encoder",
1257                             "shares could be placed on only 3 server(s) "
1258                             "such that any 3 of them have enough shares to "
1259                             "recover the file, but we were asked to place "
1260                             "shares on at least 4",
1261                             self._do_upload_with_broken_servers, 2))
1262         return d
1263
1264
1265     def test_merge_peers(self):
1266         # merge_peers merges a list of used_peers and a dict of
1267         # shareid -> peerid mappings.
1268         shares = {
1269                     1 : set(["server1"]),
1270                     2 : set(["server2"]),
1271                     3 : set(["server3"]),
1272                     4 : set(["server4", "server5"]),
1273                     5 : set(["server1", "server2"]),
1274                  }
1275         # if not provided with a used_peers argument, it should just
1276         # return the first argument unchanged.
1277         self.failUnlessEqual(shares, merge_peers(shares, set([])))
1278         class FakePeerTracker:
1279             pass
1280         trackers = []
1281         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1282             t = FakePeerTracker()
1283             t.peerid = server
1284             t.buckets = [i]
1285             trackers.append(t)
1286         expected = {
1287                     1 : set(["server1"]),
1288                     2 : set(["server2"]),
1289                     3 : set(["server3"]),
1290                     4 : set(["server4", "server5"]),
1291                     5 : set(["server1", "server2", "server5"]),
1292                     6 : set(["server6"]),
1293                     7 : set(["server7"]),
1294                     8 : set(["server8"]),
1295                    }
1296         self.failUnlessEqual(expected, merge_peers(shares, set(trackers)))
1297         shares2 = {}
1298         expected = {
1299                     5 : set(["server5"]),
1300                     6 : set(["server6"]),
1301                     7 : set(["server7"]),
1302                     8 : set(["server8"]),
1303                    }
1304         self.failUnlessEqual(expected, merge_peers(shares2, set(trackers)))
1305         shares3 = {}
1306         trackers = []
1307         expected = {}
1308         for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1309             shares3[i] = set([server])
1310             t = FakePeerTracker()
1311             t.peerid = server
1312             t.buckets = [i]
1313             trackers.append(t)
1314             expected[i] = set([server])
1315         self.failUnlessEqual(expected, merge_peers(shares3, set(trackers)))
1316
1317
1318     def test_servers_of_happiness_utility_function(self):
1319         # These tests are concerned with the servers_of_happiness()
1320         # utility function, and its underlying matching algorithm. Other
1321         # aspects of the servers_of_happiness behavior are tested
1322         # elsehwere These tests exist to ensure that
1323         # servers_of_happiness doesn't under or overcount the happiness
1324         # value for given inputs.
1325
1326         # servers_of_happiness expects a dict of
1327         # shnum => set(peerids) as a preexisting shares argument.
1328         test1 = {
1329                  1 : set(["server1"]),
1330                  2 : set(["server2"]),
1331                  3 : set(["server3"]),
1332                  4 : set(["server4"])
1333                 }
1334         happy = servers_of_happiness(test1)
1335         self.failUnlessEqual(4, happy)
1336         test1[4] = set(["server1"])
1337         # We've added a duplicate server, so now servers_of_happiness
1338         # should be 3 instead of 4.
1339         happy = servers_of_happiness(test1)
1340         self.failUnlessEqual(3, happy)
1341         # The second argument of merge_peers should be a set of
1342         # objects with peerid and buckets as attributes. In actual use,
1343         # these will be PeerTracker instances, but for testing it is fine
1344         # to make a FakePeerTracker whose job is to hold those instance
1345         # variables to test that part.
1346         class FakePeerTracker:
1347             pass
1348         trackers = []
1349         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1350             t = FakePeerTracker()
1351             t.peerid = server
1352             t.buckets = [i]
1353             trackers.append(t)
1354         # Recall that test1 is a server layout with servers_of_happiness
1355         # = 3.  Since there isn't any overlap between the shnum ->
1356         # set([peerid]) correspondences in test1 and those in trackers,
1357         # the result here should be 7.
1358         test2 = merge_peers(test1, set(trackers))
1359         happy = servers_of_happiness(test2)
1360         self.failUnlessEqual(7, happy)
1361         # Now add an overlapping server to trackers. This is redundant,
1362         # so it should not cause the previously reported happiness value
1363         # to change.
1364         t = FakePeerTracker()
1365         t.peerid = "server1"
1366         t.buckets = [1]
1367         trackers.append(t)
1368         test2 = merge_peers(test1, set(trackers))
1369         happy = servers_of_happiness(test2)
1370         self.failUnlessEqual(7, happy)
1371         test = {}
1372         happy = servers_of_happiness(test)
1373         self.failUnlessEqual(0, happy)
1374         # Test a more substantial overlap between the trackers and the
1375         # existing assignments.
1376         test = {
1377             1 : set(['server1']),
1378             2 : set(['server2']),
1379             3 : set(['server3']),
1380             4 : set(['server4']),
1381         }
1382         trackers = []
1383         t = FakePeerTracker()
1384         t.peerid = 'server5'
1385         t.buckets = [4]
1386         trackers.append(t)
1387         t = FakePeerTracker()
1388         t.peerid = 'server6'
1389         t.buckets = [3, 5]
1390         trackers.append(t)
1391         # The value returned by servers_of_happiness is the size
1392         # of a maximum matching in the bipartite graph that
1393         # servers_of_happiness() makes between peerids and share
1394         # numbers. It should find something like this:
1395         # (server 1, share 1)
1396         # (server 2, share 2)
1397         # (server 3, share 3)
1398         # (server 5, share 4)
1399         # (server 6, share 5)
1400         #
1401         # and, since there are 5 edges in this matching, it should
1402         # return 5.
1403         test2 = merge_peers(test, set(trackers))
1404         happy = servers_of_happiness(test2)
1405         self.failUnlessEqual(5, happy)
1406         # Zooko's first puzzle:
1407         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1408         #
1409         # server 1: shares 0, 1
1410         # server 2: shares 1, 2
1411         # server 3: share 2
1412         #
1413         # This should yield happiness of 3.
1414         test = {
1415             0 : set(['server1']),
1416             1 : set(['server1', 'server2']),
1417             2 : set(['server2', 'server3']),
1418         }
1419         self.failUnlessEqual(3, servers_of_happiness(test))
1420         # Zooko's second puzzle:
1421         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1422         #
1423         # server 1: shares 0, 1
1424         # server 2: share 1
1425         #
1426         # This should yield happiness of 2.
1427         test = {
1428             0 : set(['server1']),
1429             1 : set(['server1', 'server2']),
1430         }
1431         self.failUnlessEqual(2, servers_of_happiness(test))
1432
1433
1434     def test_shares_by_server(self):
1435         test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1436         sbs = shares_by_server(test)
1437         self.failUnlessEqual(set([1]), sbs["server1"])
1438         self.failUnlessEqual(set([2]), sbs["server2"])
1439         self.failUnlessEqual(set([3]), sbs["server3"])
1440         self.failUnlessEqual(set([4]), sbs["server4"])
1441         test1 = {
1442                     1 : set(["server1"]),
1443                     2 : set(["server1"]),
1444                     3 : set(["server1"]),
1445                     4 : set(["server2"]),
1446                     5 : set(["server2"])
1447                 }
1448         sbs = shares_by_server(test1)
1449         self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1450         self.failUnlessEqual(set([4, 5]), sbs["server2"])
1451         # This should fail unless the peerid part of the mapping is a set
1452         test2 = {1: "server1"}
1453         self.shouldFail(AssertionError,
1454                        "test_shares_by_server",
1455                        "",
1456                        shares_by_server, test2)
1457
1458
1459     def test_existing_share_detection(self):
1460         self.basedir = self.mktemp()
1461         d = self._setup_and_upload()
1462         # Our final setup should look like this:
1463         # server 1: shares 0 - 9, read-only
1464         # server 2: empty
1465         # server 3: empty
1466         # server 4: empty
1467         # The purpose of this test is to make sure that the peer selector
1468         # knows about the shares on server 1, even though it is read-only.
1469         # It used to simply filter these out, which would cause the test
1470         # to fail when servers_of_happiness = 4.
1471         d.addCallback(lambda ign:
1472             self._add_server_with_share(1, 0, True))
1473         d.addCallback(lambda ign:
1474             self._add_server(2))
1475         d.addCallback(lambda ign:
1476             self._add_server(3))
1477         d.addCallback(lambda ign:
1478             self._add_server(4))
1479         def _copy_shares(ign):
1480             for i in xrange(1, 10):
1481                 self._copy_share_to_server(i, 1)
1482         d.addCallback(_copy_shares)
1483         d.addCallback(lambda ign:
1484             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1485         def _prepare_client(ign):
1486             client = self.g.clients[0]
1487             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1488             return client
1489         d.addCallback(_prepare_client)
1490         d.addCallback(lambda client:
1491             client.upload(upload.Data("data" * 10000, convergence="")))
1492         return d
1493
1494
1495     def test_query_counting(self):
1496         # If peer selection fails, Tahoe2PeerSelector prints out a lot
1497         # of helpful diagnostic information, including query stats.
1498         # This test helps make sure that that information is accurate.
1499         self.basedir = self.mktemp()
1500         d = self._setup_and_upload()
1501         def _setup(ign):
1502             for i in xrange(1, 11):
1503                 self._add_server(server_number=i)
1504             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1505             c = self.g.clients[0]
1506             # We set happy to an unsatisfiable value so that we can check the
1507             # counting in the exception message. The same progress message
1508             # is also used when the upload is successful, but in that case it
1509             # only gets written to a log, so we can't see what it says.
1510             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1511             return c
1512         d.addCallback(_setup)
1513         d.addCallback(lambda c:
1514             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1515                             "10 queries placed some shares",
1516                             c.upload, upload.Data("data" * 10000,
1517                                                   convergence="")))
1518         # Now try with some readonly servers. We want to make sure that
1519         # the readonly peer share discovery phase is counted correctly.
1520         def _reset(ign):
1521             self.basedir = self.mktemp()
1522             self.g = None
1523         d.addCallback(_reset)
1524         d.addCallback(lambda ign:
1525             self._setup_and_upload())
1526         def _then(ign):
1527             for i in xrange(1, 11):
1528                 self._add_server(server_number=i)
1529             self._add_server(server_number=11, readonly=True)
1530             self._add_server(server_number=12, readonly=True)
1531             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1532             c = self.g.clients[0]
1533             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1534             return c
1535         d.addCallback(_then)
1536         d.addCallback(lambda c:
1537             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1538                             "2 placed none (of which 2 placed none due to "
1539                             "the server being full",
1540                             c.upload, upload.Data("data" * 10000,
1541                                                   convergence="")))
1542         # Now try the case where the upload process finds a bunch of the
1543         # shares that it wants to place on the first server, including
1544         # the one that it wanted to allocate there. Though no shares will
1545         # be allocated in this request, it should still be called
1546         # productive, since it caused some homeless shares to be
1547         # removed.
1548         d.addCallback(_reset)
1549         d.addCallback(lambda ign:
1550             self._setup_and_upload())
1551
1552         def _next(ign):
1553             for i in xrange(1, 11):
1554                 self._add_server(server_number=i)
1555             # Copy all of the shares to server 9, since that will be
1556             # the first one that the selector sees.
1557             for i in xrange(10):
1558                 self._copy_share_to_server(i, 9)
1559             # Remove server 0, and its contents
1560             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1561             # Make happiness unsatisfiable
1562             c = self.g.clients[0]
1563             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1564             return c
1565         d.addCallback(_next)
1566         d.addCallback(lambda c:
1567             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1568                             "1 queries placed some shares",
1569                             c.upload, upload.Data("data" * 10000,
1570                                                   convergence="")))
1571         return d
1572
1573
1574     def test_upper_limit_on_readonly_queries(self):
1575         self.basedir = self.mktemp()
1576         d = self._setup_and_upload()
1577         def _then(ign):
1578             for i in xrange(1, 11):
1579                 self._add_server(server_number=i, readonly=True)
1580             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1581             c = self.g.clients[0]
1582             c.DEFAULT_ENCODING_PARAMETERS['k'] = 2
1583             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1584             c.DEFAULT_ENCODING_PARAMETERS['n'] = 4
1585             return c
1586         d.addCallback(_then)
1587         d.addCallback(lambda client:
1588             self.shouldFail(UploadUnhappinessError,
1589                             "test_upper_limit_on_readonly_queries",
1590                             "sent 8 queries to 8 peers",
1591                             client.upload,
1592                             upload.Data('data' * 10000, convergence="")))
1593         return d
1594
1595
1596     def test_exception_messages_during_peer_selection(self):
1597         # server 1: read-only, no shares
1598         # server 2: read-only, no shares
1599         # server 3: read-only, no shares
1600         # server 4: read-only, no shares
1601         # server 5: read-only, no shares
1602         # This will fail, but we want to make sure that the log messages
1603         # are informative about why it has failed.
1604         self.basedir = self.mktemp()
1605         d = self._setup_and_upload()
1606         d.addCallback(lambda ign:
1607             self._add_server(server_number=1, readonly=True))
1608         d.addCallback(lambda ign:
1609             self._add_server(server_number=2, readonly=True))
1610         d.addCallback(lambda ign:
1611             self._add_server(server_number=3, readonly=True))
1612         d.addCallback(lambda ign:
1613             self._add_server(server_number=4, readonly=True))
1614         d.addCallback(lambda ign:
1615             self._add_server(server_number=5, readonly=True))
1616         d.addCallback(lambda ign:
1617             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1618         def _reset_encoding_parameters(ign, happy=4):
1619             client = self.g.clients[0]
1620             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1621             return client
1622         d.addCallback(_reset_encoding_parameters)
1623         d.addCallback(lambda client:
1624             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1625                             "placed 0 shares out of 10 "
1626                             "total (10 homeless), want to place shares on at "
1627                             "least 4 servers such that any 3 of them have "
1628                             "enough shares to recover the file, "
1629                             "sent 5 queries to 5 peers, 0 queries placed "
1630                             "some shares, 5 placed none "
1631                             "(of which 5 placed none due to the server being "
1632                             "full and 0 placed none due to an error)",
1633                             client.upload,
1634                             upload.Data("data" * 10000, convergence="")))
1635
1636
1637         # server 1: read-only, no shares
1638         # server 2: broken, no shares
1639         # server 3: read-only, no shares
1640         # server 4: read-only, no shares
1641         # server 5: read-only, no shares
1642         def _reset(ign):
1643             self.basedir = self.mktemp()
1644         d.addCallback(_reset)
1645         d.addCallback(lambda ign:
1646             self._setup_and_upload())
1647         d.addCallback(lambda ign:
1648             self._add_server(server_number=1, readonly=True))
1649         d.addCallback(lambda ign:
1650             self._add_server(server_number=2))
1651         def _break_server_2(ign):
1652             server = self.g.servers_by_number[2].my_nodeid
1653             # We have to break the server in servers_by_id,
1654             # because the one in servers_by_number isn't wrapped,
1655             # and doesn't look at its broken attribute when answering
1656             # queries.
1657             self.g.servers_by_id[server].broken = True
1658         d.addCallback(_break_server_2)
1659         d.addCallback(lambda ign:
1660             self._add_server(server_number=3, readonly=True))
1661         d.addCallback(lambda ign:
1662             self._add_server(server_number=4, readonly=True))
1663         d.addCallback(lambda ign:
1664             self._add_server(server_number=5, readonly=True))
1665         d.addCallback(lambda ign:
1666             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1667         d.addCallback(_reset_encoding_parameters)
1668         d.addCallback(lambda client:
1669             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1670                             "placed 0 shares out of 10 "
1671                             "total (10 homeless), want to place shares on at "
1672                             "least 4 servers such that any 3 of them have "
1673                             "enough shares to recover the file, "
1674                             "sent 5 queries to 5 peers, 0 queries placed "
1675                             "some shares, 5 placed none "
1676                             "(of which 4 placed none due to the server being "
1677                             "full and 1 placed none due to an error)",
1678                             client.upload,
1679                             upload.Data("data" * 10000, convergence="")))
1680         # server 0, server 1 = empty, accepting shares
1681         # This should place all of the shares, but still fail with happy=4.
1682         # We want to make sure that the exception message is worded correctly.
1683         d.addCallback(_reset)
1684         d.addCallback(lambda ign:
1685             self._setup_grid())
1686         d.addCallback(lambda ign:
1687             self._add_server(server_number=1))
1688         d.addCallback(_reset_encoding_parameters)
1689         d.addCallback(lambda client:
1690             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1691                             "shares could be placed or found on only 2 "
1692                             "server(s). We were asked to place shares on at "
1693                             "least 4 server(s) such that any 3 of them have "
1694                             "enough shares to recover the file.",
1695                             client.upload, upload.Data("data" * 10000,
1696                                                        convergence="")))
1697         # servers 0 - 4 = empty, accepting shares
1698         # This too should place all the shares, and this too should fail,
1699         # but since the effective happiness is more than the k encoding
1700         # parameter, it should trigger a different error message than the one
1701         # above.
1702         d.addCallback(_reset)
1703         d.addCallback(lambda ign:
1704             self._setup_grid())
1705         d.addCallback(lambda ign:
1706             self._add_server(server_number=1))
1707         d.addCallback(lambda ign:
1708             self._add_server(server_number=2))
1709         d.addCallback(lambda ign:
1710             self._add_server(server_number=3))
1711         d.addCallback(lambda ign:
1712             self._add_server(server_number=4))
1713         d.addCallback(_reset_encoding_parameters, happy=7)
1714         d.addCallback(lambda client:
1715             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1716                             "shares could be placed on only 5 server(s) such "
1717                             "that any 3 of them have enough shares to recover "
1718                             "the file, but we were asked to place shares on "
1719                             "at least 7 such servers.",
1720                             client.upload, upload.Data("data" * 10000,
1721                                                        convergence="")))
1722         # server 0: shares 0 - 9
1723         # server 1: share 0, read-only
1724         # server 2: share 0, read-only
1725         # server 3: share 0, read-only
1726         # This should place all of the shares, but fail with happy=4.
1727         # Since the number of servers with shares is more than the number
1728         # necessary to reconstitute the file, this will trigger a different
1729         # error message than either of those above.
1730         d.addCallback(_reset)
1731         d.addCallback(lambda ign:
1732             self._setup_and_upload())
1733         d.addCallback(lambda ign:
1734             self._add_server_with_share(server_number=1, share_number=0,
1735                                         readonly=True))
1736         d.addCallback(lambda ign:
1737             self._add_server_with_share(server_number=2, share_number=0,
1738                                         readonly=True))
1739         d.addCallback(lambda ign:
1740             self._add_server_with_share(server_number=3, share_number=0,
1741                                         readonly=True))
1742         d.addCallback(_reset_encoding_parameters, happy=7)
1743         d.addCallback(lambda client:
1744             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1745                             "shares could be placed or found on 4 server(s), "
1746                             "but they are not spread out evenly enough to "
1747                             "ensure that any 3 of these servers would have "
1748                             "enough shares to recover the file. We were asked "
1749                             "to place shares on at least 7 servers such that "
1750                             "any 3 of them have enough shares to recover the "
1751                             "file",
1752                             client.upload, upload.Data("data" * 10000,
1753                                                        convergence="")))
1754         return d
1755
1756
1757     def test_problem_layout_comment_187(self):
1758         # #778 comment 187 broke an initial attempt at a share
1759         # redistribution algorithm. This test is here to demonstrate the
1760         # breakage, and to test that subsequent algorithms don't also
1761         # break in the same way.
1762         self.basedir = self.mktemp()
1763         d = self._setup_and_upload(k=2, n=3)
1764
1765         # server 1: shares 0, 1, 2, readonly
1766         # server 2: share 0, readonly
1767         # server 3: share 0
1768         def _setup(ign):
1769             self._add_server_with_share(server_number=1, share_number=0,
1770                                         readonly=True)
1771             self._add_server_with_share(server_number=2, share_number=0,
1772                                         readonly=True)
1773             self._add_server_with_share(server_number=3, share_number=0)
1774             # Copy shares
1775             self._copy_share_to_server(1, 1)
1776             self._copy_share_to_server(2, 1)
1777             # Remove server 0
1778             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1779             client = self.g.clients[0]
1780             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
1781             return client
1782
1783         d.addCallback(_setup)
1784         d.addCallback(lambda client:
1785             client.upload(upload.Data("data" * 10000, convergence="")))
1786         return d
1787     test_problem_layout_comment_187.todo = "this isn't fixed yet"
1788
1789     def test_problem_layout_ticket_1118(self):
1790         # #1118 includes a report from a user who hit an assertion in
1791         # the upload code with this layout.
1792         self.basedir = self.mktemp()
1793         d = self._setup_and_upload(k=2, n=4)
1794
1795         # server 0: no shares
1796         # server 1: shares 0, 3
1797         # server 3: share 1
1798         # server 2: share 2
1799         # The order that they get queries is 0, 1, 3, 2
1800         def _setup(ign):
1801             self._add_server(server_number=0)
1802             self._add_server_with_share(server_number=1, share_number=0)
1803             self._add_server_with_share(server_number=2, share_number=2)
1804             self._add_server_with_share(server_number=3, share_number=1)
1805             # Copy shares
1806             self._copy_share_to_server(3, 1)
1807             storedir = self.get_serverdir(0)
1808             # remove the storedir, wiping out any existing shares
1809             shutil.rmtree(storedir)
1810             # create an empty storedir to replace the one we just removed
1811             os.mkdir(storedir)
1812             client = self.g.clients[0]
1813             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1814             return client
1815
1816         d.addCallback(_setup)
1817         d.addCallback(lambda client:
1818                           client.upload(upload.Data("data" * 10000, convergence="")))
1819         return d
1820
1821     def test_upload_succeeds_with_some_homeless_shares(self):
1822         # If the upload is forced to stop trying to place shares before
1823         # it has placed (or otherwise accounted) for all of them, but it
1824         # has placed enough to satisfy the upload health criteria that
1825         # we're using, it should still succeed.
1826         self.basedir = self.mktemp()
1827         d = self._setup_and_upload()
1828         def _server_setup(ign):
1829             # Add four servers so that we have a layout like this:
1830             # server 1: share 0, read-only
1831             # server 2: share 1, read-only
1832             # server 3: share 2, read-only
1833             # server 4: share 3, read-only
1834             # If we set happy = 4, the upload will manage to satisfy
1835             # servers of happiness, but not place all of the shares; we
1836             # want to test that the upload is declared successful in
1837             # this case.
1838             self._add_server_with_share(server_number=1, share_number=0,
1839                                         readonly=True)
1840             self._add_server_with_share(server_number=2, share_number=1,
1841                                         readonly=True)
1842             self._add_server_with_share(server_number=3, share_number=2,
1843                                         readonly=True)
1844             self._add_server_with_share(server_number=4, share_number=3,
1845                                         readonly=True)
1846             # Remove server 0.
1847             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1848             # Set the client appropriately
1849             c = self.g.clients[0]
1850             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1851             return c
1852         d.addCallback(_server_setup)
1853         d.addCallback(lambda client:
1854             client.upload(upload.Data("data" * 10000, convergence="")))
1855         return d
1856
1857
1858     def test_uploader_skips_over_servers_with_only_one_share(self):
1859         # We want to make sure that the redistribution logic ignores
1860         # servers with only one share, since placing these shares
1861         # elsewhere will at best keep happiness the same as it was, and
1862         # at worst hurt it.
1863         self.basedir = self.mktemp()
1864         d = self._setup_and_upload()
1865         def _server_setup(ign):
1866             # Add some servers so that the upload will need to
1867             # redistribute, but will first pass over a couple of servers
1868             # that don't have enough shares to redistribute before
1869             # finding one that does have shares to redistribute.
1870             self._add_server_with_share(server_number=1, share_number=0)
1871             self._add_server_with_share(server_number=2, share_number=2)
1872             self._add_server_with_share(server_number=3, share_number=1)
1873             self._add_server_with_share(server_number=8, share_number=4)
1874             self._add_server_with_share(server_number=5, share_number=5)
1875             self._add_server_with_share(server_number=10, share_number=7)
1876             for i in xrange(4):
1877                 self._copy_share_to_server(i, 2)
1878             return self.g.clients[0]
1879         d.addCallback(_server_setup)
1880         d.addCallback(lambda client:
1881             client.upload(upload.Data("data" * 10000, convergence="")))
1882         return d
1883
1884
1885     def test_peer_selector_bucket_abort(self):
1886         # If peer selection for an upload fails due to an unhappy
1887         # layout, the peer selection process should abort the buckets it
1888         # allocates before failing, so that the space can be re-used.
1889         self.basedir = self.mktemp()
1890         self.set_up_grid(num_servers=5)
1891
1892         # Try to upload a file with happy=7, which is unsatisfiable with
1893         # the current grid. This will fail, but should not take up any
1894         # space on the storage servers after it fails.
1895         client = self.g.clients[0]
1896         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
1897         d = defer.succeed(None)
1898         d.addCallback(lambda ignored:
1899             self.shouldFail(UploadUnhappinessError,
1900                             "test_peer_selection_bucket_abort",
1901                             "",
1902                             client.upload, upload.Data("data" * 10000,
1903                                                        convergence="")))
1904         # wait for the abort messages to get there.
1905         def _turn_barrier(res):
1906             return fireEventually(res)
1907         d.addCallback(_turn_barrier)
1908         def _then(ignored):
1909             for server in self.g.servers_by_number.values():
1910                 self.failUnlessEqual(server.allocated_size(), 0)
1911         d.addCallback(_then)
1912         return d
1913
1914
1915     def test_encoder_bucket_abort(self):
1916         # If enough servers die in the process of encoding and uploading
1917         # a file to make the layout unhappy, we should cancel the
1918         # newly-allocated buckets before dying.
1919         self.basedir = self.mktemp()
1920         self.set_up_grid(num_servers=4)
1921
1922         client = self.g.clients[0]
1923         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
1924
1925         d = defer.succeed(None)
1926         d.addCallback(lambda ignored:
1927             self.shouldFail(UploadUnhappinessError,
1928                             "test_encoder_bucket_abort",
1929                             "",
1930                             self._do_upload_with_broken_servers, 1))
1931         def _turn_barrier(res):
1932             return fireEventually(res)
1933         d.addCallback(_turn_barrier)
1934         def _then(ignored):
1935             for server in self.g.servers_by_number.values():
1936                 self.failUnlessEqual(server.allocated_size(), 0)
1937         d.addCallback(_then)
1938         return d
1939
1940
1941     def _set_up_nodes_extra_config(self, clientdir):
1942         cfgfn = os.path.join(clientdir, "tahoe.cfg")
1943         oldcfg = open(cfgfn, "r").read()
1944         f = open(cfgfn, "wt")
1945         f.write(oldcfg)
1946         f.write("\n")
1947         f.write("[client]\n")
1948         f.write("shares.needed = 7\n")
1949         f.write("shares.total = 12\n")
1950         f.write("\n")
1951         f.close()
1952         return None
1953
1954 # TODO:
1955 #  upload with exactly 75 peers (shares_of_happiness)
1956 #  have a download fail
1957 #  cancel a download (need to implement more cancel stuff)