]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
tests, NEWS, CREDITS re: #1117
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1 import os, shutil
2 from cStringIO import StringIO
3 from twisted.trial import unittest
4 from twisted.python.failure import Failure
5 from twisted.python import log
6 from twisted.internet import defer
7 from foolscap.api import fireEventually
8
9 import allmydata # for __full_version__
10 from allmydata import uri, monitor, client
11 from allmydata.immutable import upload, encode
12 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
13 from allmydata.util.assertutil import precondition
14 from allmydata.util.deferredutil import DeferredListShouldSucceed
15 from allmydata.test.no_network import GridTestMixin
16 from allmydata.test.common_util import ShouldFailMixin
17 from allmydata.util.happinessutil import servers_of_happiness, \
18                                          shares_by_server, merge_peers
19 from allmydata.storage_client import StorageFarmBroker
20 from allmydata.storage.server import storage_index_to_dir
21
22 MiB = 1024*1024
23
24 def extract_uri(results):
25     return results.uri
26
27 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
28 # have been due to an earlier test ERROR'ing out due to timeout, which seems
29 # to screw up subsequent tests.
30 timeout = 960
31
32 class Uploadable(unittest.TestCase):
33     def shouldEqual(self, data, expected):
34         self.failUnless(isinstance(data, list))
35         for e in data:
36             self.failUnless(isinstance(e, str))
37         s = "".join(data)
38         self.failUnlessEqual(s, expected)
39
40     def test_filehandle_random_key(self):
41         return self._test_filehandle(convergence=None)
42
43     def test_filehandle_convergent_encryption(self):
44         return self._test_filehandle(convergence="some convergence string")
45
46     def _test_filehandle(self, convergence):
47         s = StringIO("a"*41)
48         u = upload.FileHandle(s, convergence=convergence)
49         d = u.get_size()
50         d.addCallback(self.failUnlessEqual, 41)
51         d.addCallback(lambda res: u.read(1))
52         d.addCallback(self.shouldEqual, "a")
53         d.addCallback(lambda res: u.read(80))
54         d.addCallback(self.shouldEqual, "a"*40)
55         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
56         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
57         return d
58
59     def test_filename(self):
60         basedir = "upload/Uploadable/test_filename"
61         os.makedirs(basedir)
62         fn = os.path.join(basedir, "file")
63         f = open(fn, "w")
64         f.write("a"*41)
65         f.close()
66         u = upload.FileName(fn, convergence=None)
67         d = u.get_size()
68         d.addCallback(self.failUnlessEqual, 41)
69         d.addCallback(lambda res: u.read(1))
70         d.addCallback(self.shouldEqual, "a")
71         d.addCallback(lambda res: u.read(80))
72         d.addCallback(self.shouldEqual, "a"*40)
73         d.addCallback(lambda res: u.close())
74         return d
75
76     def test_data(self):
77         s = "a"*41
78         u = upload.Data(s, convergence=None)
79         d = u.get_size()
80         d.addCallback(self.failUnlessEqual, 41)
81         d.addCallback(lambda res: u.read(1))
82         d.addCallback(self.shouldEqual, "a")
83         d.addCallback(lambda res: u.read(80))
84         d.addCallback(self.shouldEqual, "a"*40)
85         d.addCallback(lambda res: u.close())
86         return d
87
88 class ServerError(Exception):
89     pass
90
91 class SetDEPMixin:
92     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
93         p = {"k": k,
94              "happy": happy,
95              "n": n,
96              "max_segment_size": max_segsize,
97              }
98         self.node.DEFAULT_ENCODING_PARAMETERS = p
99
100 class FakeStorageServer:
101     def __init__(self, mode):
102         self.mode = mode
103         self.allocated = []
104         self.queries = 0
105         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
106                          { "maximum-immutable-share-size": 2**32 },
107                          "application-version": str(allmydata.__full_version__),
108                          }
109         if mode == "small":
110             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
111                              { "maximum-immutable-share-size": 10 },
112                              "application-version": str(allmydata.__full_version__),
113                              }
114
115
116     def callRemote(self, methname, *args, **kwargs):
117         def _call():
118             meth = getattr(self, methname)
119             return meth(*args, **kwargs)
120         d = fireEventually()
121         d.addCallback(lambda res: _call())
122         return d
123
124     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
125                          sharenums, share_size, canary):
126         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
127         if self.mode == "first-fail":
128             if self.queries == 0:
129                 raise ServerError
130         if self.mode == "second-fail":
131             if self.queries == 1:
132                 raise ServerError
133         self.queries += 1
134         if self.mode == "full":
135             return (set(), {},)
136         elif self.mode == "already got them":
137             return (set(sharenums), {},)
138         else:
139             for shnum in sharenums:
140                 self.allocated.append( (storage_index, shnum) )
141             return (set(),
142                     dict([( shnum, FakeBucketWriter(share_size) )
143                           for shnum in sharenums]),
144                     )
145
146 class FakeBucketWriter:
147     # a diagnostic version of storageserver.BucketWriter
148     def __init__(self, size):
149         self.data = StringIO()
150         self.closed = False
151         self._size = size
152
153     def callRemote(self, methname, *args, **kwargs):
154         def _call():
155             meth = getattr(self, "remote_" + methname)
156             return meth(*args, **kwargs)
157         d = fireEventually()
158         d.addCallback(lambda res: _call())
159         return d
160
161
162     def callRemoteOnly(self, methname, *args, **kwargs):
163         d = self.callRemote(methname, *args, **kwargs)
164         del d # callRemoteOnly ignores this
165         return None
166
167
168     def remote_write(self, offset, data):
169         precondition(not self.closed)
170         precondition(offset >= 0)
171         precondition(offset+len(data) <= self._size,
172                      "offset=%d + data=%d > size=%d" %
173                      (offset, len(data), self._size))
174         self.data.seek(offset)
175         self.data.write(data)
176
177     def remote_close(self):
178         precondition(not self.closed)
179         self.closed = True
180
181     def remote_abort(self):
182         pass
183
184 class FakeClient:
185     DEFAULT_ENCODING_PARAMETERS = {"k":25,
186                                    "happy": 25,
187                                    "n": 100,
188                                    "max_segment_size": 1*MiB,
189                                    }
190     def __init__(self, mode="good", num_servers=50):
191         self.num_servers = num_servers
192         if type(mode) is str:
193             mode = dict([i,mode] for i in range(num_servers))
194         peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
195                   for fakeid in range(self.num_servers) ]
196         self.storage_broker = StorageFarmBroker(None, permute_peers=True)
197         for (serverid, server) in peers:
198             self.storage_broker.test_add_server(serverid, server)
199         self.last_peers = [p[1] for p in peers]
200
201     def log(self, *args, **kwargs):
202         pass
203     def get_encoding_parameters(self):
204         return self.DEFAULT_ENCODING_PARAMETERS
205     def get_storage_broker(self):
206         return self.storage_broker
207     _secret_holder = client.SecretHolder("lease secret", "convergence secret")
208
209 class GotTooFarError(Exception):
210     pass
211
212 class GiganticUploadable(upload.FileHandle):
213     def __init__(self, size):
214         self._size = size
215         self._fp = 0
216
217     def get_encryption_key(self):
218         return defer.succeed("\x00" * 16)
219     def get_size(self):
220         return defer.succeed(self._size)
221     def read(self, length):
222         left = self._size - self._fp
223         length = min(left, length)
224         self._fp += length
225         if self._fp > 1000000:
226             # terminate the test early.
227             raise GotTooFarError("we shouldn't be allowed to get this far")
228         return defer.succeed(["\x00" * length])
229     def close(self):
230         pass
231
232 DATA = """
233 Once upon a time, there was a beautiful princess named Buttercup. She lived
234 in a magical land where every file was stored securely among millions of
235 machines, and nobody ever worried about their data being lost ever again.
236 The End.
237 """
238 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
239
240 SIZE_ZERO = 0
241 SIZE_SMALL = 16
242 SIZE_LARGE = len(DATA)
243
244 def upload_data(uploader, data):
245     u = upload.Data(data, convergence=None)
246     return uploader.upload(u)
247 def upload_filename(uploader, filename):
248     u = upload.FileName(filename, convergence=None)
249     return uploader.upload(u)
250 def upload_filehandle(uploader, fh):
251     u = upload.FileHandle(fh, convergence=None)
252     return uploader.upload(u)
253
254 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
255     def setUp(self):
256         self.node = FakeClient(mode="good")
257         self.u = upload.Uploader()
258         self.u.running = True
259         self.u.parent = self.node
260
261     def _check_small(self, newuri, size):
262         u = uri.from_string(newuri)
263         self.failUnless(isinstance(u, uri.LiteralFileURI))
264         self.failUnlessEqual(len(u.data), size)
265
266     def _check_large(self, newuri, size):
267         u = uri.from_string(newuri)
268         self.failUnless(isinstance(u, uri.CHKFileURI))
269         self.failUnless(isinstance(u.get_storage_index(), str))
270         self.failUnlessEqual(len(u.get_storage_index()), 16)
271         self.failUnless(isinstance(u.key, str))
272         self.failUnlessEqual(len(u.key), 16)
273         self.failUnlessEqual(u.size, size)
274
275     def get_data(self, size):
276         return DATA[:size]
277
278     def test_too_large(self):
279         # we've removed the 4GiB share size limit (see ticket #346 for
280         # details), but still have an 8-byte field, so the limit is now
281         # 2**64, so make sure we reject files larger than that.
282         k = 3; happy = 7; n = 10
283         self.set_encoding_parameters(k, happy, n)
284         big = k*(2**64)
285         data1 = GiganticUploadable(big)
286         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
287                             "This file is too large to be uploaded (data_size)",
288                             self.u.upload, data1)
289         data2 = GiganticUploadable(big-3)
290         d.addCallback(lambda res:
291                       self.shouldFail(FileTooLargeError,
292                                       "test_too_large-data2",
293                                       "This file is too large to be uploaded (offsets)",
294                                       self.u.upload, data2))
295         # I don't know where the actual limit is.. it depends upon how large
296         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
297         return d
298
299     def test_data_zero(self):
300         data = self.get_data(SIZE_ZERO)
301         d = upload_data(self.u, data)
302         d.addCallback(extract_uri)
303         d.addCallback(self._check_small, SIZE_ZERO)
304         return d
305
306     def test_data_small(self):
307         data = self.get_data(SIZE_SMALL)
308         d = upload_data(self.u, data)
309         d.addCallback(extract_uri)
310         d.addCallback(self._check_small, SIZE_SMALL)
311         return d
312
313     def test_data_large(self):
314         data = self.get_data(SIZE_LARGE)
315         d = upload_data(self.u, data)
316         d.addCallback(extract_uri)
317         d.addCallback(self._check_large, SIZE_LARGE)
318         return d
319
320     def test_data_large_odd_segments(self):
321         data = self.get_data(SIZE_LARGE)
322         segsize = int(SIZE_LARGE / 2.5)
323         # we want 3 segments, since that's not a power of two
324         self.set_encoding_parameters(25, 25, 100, segsize)
325         d = upload_data(self.u, data)
326         d.addCallback(extract_uri)
327         d.addCallback(self._check_large, SIZE_LARGE)
328         return d
329
330     def test_filehandle_zero(self):
331         data = self.get_data(SIZE_ZERO)
332         d = upload_filehandle(self.u, StringIO(data))
333         d.addCallback(extract_uri)
334         d.addCallback(self._check_small, SIZE_ZERO)
335         return d
336
337     def test_filehandle_small(self):
338         data = self.get_data(SIZE_SMALL)
339         d = upload_filehandle(self.u, StringIO(data))
340         d.addCallback(extract_uri)
341         d.addCallback(self._check_small, SIZE_SMALL)
342         return d
343
344     def test_filehandle_large(self):
345         data = self.get_data(SIZE_LARGE)
346         d = upload_filehandle(self.u, StringIO(data))
347         d.addCallback(extract_uri)
348         d.addCallback(self._check_large, SIZE_LARGE)
349         return d
350
351     def test_filename_zero(self):
352         fn = "Uploader-test_filename_zero.data"
353         f = open(fn, "wb")
354         data = self.get_data(SIZE_ZERO)
355         f.write(data)
356         f.close()
357         d = upload_filename(self.u, fn)
358         d.addCallback(extract_uri)
359         d.addCallback(self._check_small, SIZE_ZERO)
360         return d
361
362     def test_filename_small(self):
363         fn = "Uploader-test_filename_small.data"
364         f = open(fn, "wb")
365         data = self.get_data(SIZE_SMALL)
366         f.write(data)
367         f.close()
368         d = upload_filename(self.u, fn)
369         d.addCallback(extract_uri)
370         d.addCallback(self._check_small, SIZE_SMALL)
371         return d
372
373     def test_filename_large(self):
374         fn = "Uploader-test_filename_large.data"
375         f = open(fn, "wb")
376         data = self.get_data(SIZE_LARGE)
377         f.write(data)
378         f.close()
379         d = upload_filename(self.u, fn)
380         d.addCallback(extract_uri)
381         d.addCallback(self._check_large, SIZE_LARGE)
382         return d
383
384 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
385     def make_node(self, mode, num_servers=10):
386         self.node = FakeClient(mode, num_servers)
387         self.u = upload.Uploader()
388         self.u.running = True
389         self.u.parent = self.node
390
391     def _check_large(self, newuri, size):
392         u = uri.from_string(newuri)
393         self.failUnless(isinstance(u, uri.CHKFileURI))
394         self.failUnless(isinstance(u.get_storage_index(), str))
395         self.failUnlessEqual(len(u.get_storage_index()), 16)
396         self.failUnless(isinstance(u.key, str))
397         self.failUnlessEqual(len(u.key), 16)
398         self.failUnlessEqual(u.size, size)
399
400     def test_first_error(self):
401         mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
402         self.make_node(mode)
403         self.set_encoding_parameters(k=25, happy=1, n=50)
404         d = upload_data(self.u, DATA)
405         d.addCallback(extract_uri)
406         d.addCallback(self._check_large, SIZE_LARGE)
407         return d
408
409     def test_first_error_all(self):
410         self.make_node("first-fail")
411         d = self.shouldFail(UploadUnhappinessError, "first_error_all",
412                             "peer selection failed",
413                             upload_data, self.u, DATA)
414         def _check((f,)):
415             self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
416             # there should also be a 'last failure was' message
417             self.failUnlessIn("ServerError", str(f.value))
418         d.addCallback(_check)
419         return d
420
421     def test_second_error(self):
422         # we want to make sure we make it to a third pass. This means that
423         # the first pass was insufficient to place all shares, and at least
424         # one of second pass servers (other than the last one) accepted a
425         # share (so we'll believe that a third pass will be useful). (if
426         # everyone but the last server throws an error, then we'll send all
427         # the remaining shares to the last server at the end of the second
428         # pass, and if that succeeds, we won't make it to a third pass).
429         #
430         # we can achieve this 97.5% of the time by using 40 servers, having
431         # 39 of them fail on the second request, leaving only one to succeed
432         # on the second request. (we need to keep the number of servers low
433         # enough to ensure a second pass with 100 shares).
434         mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
435         self.make_node(mode, 40)
436         d = upload_data(self.u, DATA)
437         d.addCallback(extract_uri)
438         d.addCallback(self._check_large, SIZE_LARGE)
439         return d
440
441     def test_second_error_all(self):
442         self.make_node("second-fail")
443         d = self.shouldFail(UploadUnhappinessError, "second_error_all",
444                             "peer selection failed",
445                             upload_data, self.u, DATA)
446         def _check((f,)):
447             self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
448             # there should also be a 'last failure was' message
449             self.failUnlessIn("ServerError", str(f.value))
450         d.addCallback(_check)
451         return d
452
453 class FullServer(unittest.TestCase):
454     def setUp(self):
455         self.node = FakeClient(mode="full")
456         self.u = upload.Uploader()
457         self.u.running = True
458         self.u.parent = self.node
459
460     def _should_fail(self, f):
461         self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
462
463     def test_data_large(self):
464         data = DATA
465         d = upload_data(self.u, data)
466         d.addBoth(self._should_fail)
467         return d
468
469 class PeerSelection(unittest.TestCase):
470
471     def make_client(self, num_servers=50):
472         self.node = FakeClient(mode="good", num_servers=num_servers)
473         self.u = upload.Uploader()
474         self.u.running = True
475         self.u.parent = self.node
476
477     def get_data(self, size):
478         return DATA[:size]
479
480     def _check_large(self, newuri, size):
481         u = uri.from_string(newuri)
482         self.failUnless(isinstance(u, uri.CHKFileURI))
483         self.failUnless(isinstance(u.get_storage_index(), str))
484         self.failUnlessEqual(len(u.get_storage_index()), 16)
485         self.failUnless(isinstance(u.key, str))
486         self.failUnlessEqual(len(u.key), 16)
487         self.failUnlessEqual(u.size, size)
488
489     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
490         p = {"k": k,
491              "happy": happy,
492              "n": n,
493              "max_segment_size": max_segsize,
494              }
495         self.node.DEFAULT_ENCODING_PARAMETERS = p
496
497     def test_one_each(self):
498         # if we have 50 shares, and there are 50 peers, and they all accept a
499         # share, we should get exactly one share per peer
500
501         self.make_client()
502         data = self.get_data(SIZE_LARGE)
503         self.set_encoding_parameters(25, 30, 50)
504         d = upload_data(self.u, data)
505         d.addCallback(extract_uri)
506         d.addCallback(self._check_large, SIZE_LARGE)
507         def _check(res):
508             for p in self.node.last_peers:
509                 allocated = p.allocated
510                 self.failUnlessEqual(len(allocated), 1)
511                 self.failUnlessEqual(p.queries, 1)
512         d.addCallback(_check)
513         return d
514
515     def test_two_each(self):
516         # if we have 100 shares, and there are 50 peers, and they all accept
517         # all shares, we should get exactly two shares per peer
518
519         self.make_client()
520         data = self.get_data(SIZE_LARGE)
521         # if there are 50 peers, then happy needs to be <= 50
522         self.set_encoding_parameters(50, 50, 100)
523         d = upload_data(self.u, data)
524         d.addCallback(extract_uri)
525         d.addCallback(self._check_large, SIZE_LARGE)
526         def _check(res):
527             for p in self.node.last_peers:
528                 allocated = p.allocated
529                 self.failUnlessEqual(len(allocated), 2)
530                 self.failUnlessEqual(p.queries, 2)
531         d.addCallback(_check)
532         return d
533
534     def test_one_each_plus_one_extra(self):
535         # if we have 51 shares, and there are 50 peers, then one peer gets
536         # two shares and the rest get just one
537
538         self.make_client()
539         data = self.get_data(SIZE_LARGE)
540         self.set_encoding_parameters(24, 41, 51)
541         d = upload_data(self.u, data)
542         d.addCallback(extract_uri)
543         d.addCallback(self._check_large, SIZE_LARGE)
544         def _check(res):
545             got_one = []
546             got_two = []
547             for p in self.node.last_peers:
548                 allocated = p.allocated
549                 self.failUnless(len(allocated) in (1,2), len(allocated))
550                 if len(allocated) == 1:
551                     self.failUnlessEqual(p.queries, 1)
552                     got_one.append(p)
553                 else:
554                     self.failUnlessEqual(p.queries, 2)
555                     got_two.append(p)
556             self.failUnlessEqual(len(got_one), 49)
557             self.failUnlessEqual(len(got_two), 1)
558         d.addCallback(_check)
559         return d
560
561     def test_four_each(self):
562         # if we have 200 shares, and there are 50 peers, then each peer gets
563         # 4 shares. The design goal is to accomplish this with only two
564         # queries per peer.
565
566         self.make_client()
567         data = self.get_data(SIZE_LARGE)
568         # if there are 50 peers, then happy should be no more than 50 if
569         # we want this to work.
570         self.set_encoding_parameters(100, 50, 200)
571         d = upload_data(self.u, data)
572         d.addCallback(extract_uri)
573         d.addCallback(self._check_large, SIZE_LARGE)
574         def _check(res):
575             for p in self.node.last_peers:
576                 allocated = p.allocated
577                 self.failUnlessEqual(len(allocated), 4)
578                 self.failUnlessEqual(p.queries, 2)
579         d.addCallback(_check)
580         return d
581
582     def test_three_of_ten(self):
583         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
584         # 4+4+2
585
586         self.make_client(3)
587         data = self.get_data(SIZE_LARGE)
588         self.set_encoding_parameters(3, 3, 10)
589         d = upload_data(self.u, data)
590         d.addCallback(extract_uri)
591         d.addCallback(self._check_large, SIZE_LARGE)
592         def _check(res):
593             counts = {}
594             for p in self.node.last_peers:
595                 allocated = p.allocated
596                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
597             histogram = [counts.get(i, 0) for i in range(5)]
598             self.failUnlessEqual(histogram, [0,0,0,2,1])
599         d.addCallback(_check)
600         return d
601
602     def test_some_big_some_small(self):
603         # 10 shares, 20 servers, but half the servers don't support a
604         # share-size large enough for our file
605         mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
606         self.node = FakeClient(mode, num_servers=20)
607         self.u = upload.Uploader()
608         self.u.running = True
609         self.u.parent = self.node
610
611         data = self.get_data(SIZE_LARGE)
612         self.set_encoding_parameters(3, 5, 10)
613         d = upload_data(self.u, data)
614         d.addCallback(extract_uri)
615         d.addCallback(self._check_large, SIZE_LARGE)
616         def _check(res):
617             # we should have put one share each on the big peers, and zero
618             # shares on the small peers
619             total_allocated = 0
620             for p in self.node.last_peers:
621                 if p.mode == "good":
622                     self.failUnlessEqual(len(p.allocated), 1)
623                 elif p.mode == "small":
624                     self.failUnlessEqual(len(p.allocated), 0)
625                 total_allocated += len(p.allocated)
626             self.failUnlessEqual(total_allocated, 10)
627         d.addCallback(_check)
628         return d
629
630
631 class StorageIndex(unittest.TestCase):
632     def test_params_must_matter(self):
633         DATA = "I am some data"
634         u = upload.Data(DATA, convergence="")
635         eu = upload.EncryptAnUploadable(u)
636         d1 = eu.get_storage_index()
637
638         # CHK means the same data should encrypt the same way
639         u = upload.Data(DATA, convergence="")
640         eu = upload.EncryptAnUploadable(u)
641         d1a = eu.get_storage_index()
642
643         # but if we use a different convergence string it should be different
644         u = upload.Data(DATA, convergence="wheee!")
645         eu = upload.EncryptAnUploadable(u)
646         d1salt1 = eu.get_storage_index()
647
648         # and if we add yet a different convergence it should be different again
649         u = upload.Data(DATA, convergence="NOT wheee!")
650         eu = upload.EncryptAnUploadable(u)
651         d1salt2 = eu.get_storage_index()
652
653         # and if we use the first string again it should be the same as last time
654         u = upload.Data(DATA, convergence="wheee!")
655         eu = upload.EncryptAnUploadable(u)
656         d1salt1a = eu.get_storage_index()
657
658         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
659         u = upload.Data(DATA, convergence="")
660         u.encoding_param_k = u.default_encoding_param_k + 1
661         eu = upload.EncryptAnUploadable(u)
662         d2 = eu.get_storage_index()
663
664         # and if we use a random key, it should be different than the CHK
665         u = upload.Data(DATA, convergence=None)
666         eu = upload.EncryptAnUploadable(u)
667         d3 = eu.get_storage_index()
668         # and different from another instance
669         u = upload.Data(DATA, convergence=None)
670         eu = upload.EncryptAnUploadable(u)
671         d4 = eu.get_storage_index()
672
673         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
674         def _done(res):
675             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
676             self.failUnlessEqual(si1, si1a)
677             self.failIfEqual(si1, si2)
678             self.failIfEqual(si1, si3)
679             self.failIfEqual(si1, si4)
680             self.failIfEqual(si3, si4)
681             self.failIfEqual(si1salt1, si1)
682             self.failIfEqual(si1salt1, si1salt2)
683             self.failIfEqual(si1salt2, si1)
684             self.failUnlessEqual(si1salt1, si1salt1a)
685         d.addCallback(_done)
686         return d
687
688 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
689     ShouldFailMixin):
690     def _do_upload_with_broken_servers(self, servers_to_break):
691         """
692         I act like a normal upload, but before I send the results of
693         Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
694         PeerTrackers in the used_peers part of the return result.
695         """
696         assert self.g, "I tried to find a grid at self.g, but failed"
697         broker = self.g.clients[0].storage_broker
698         sh     = self.g.clients[0]._secret_holder
699         data = upload.Data("data" * 10000, convergence="")
700         data.encoding_param_k = 3
701         data.encoding_param_happy = 4
702         data.encoding_param_n = 10
703         uploadable = upload.EncryptAnUploadable(data)
704         encoder = encode.Encoder()
705         encoder.set_encrypted_uploadable(uploadable)
706         status = upload.UploadStatus()
707         selector = upload.Tahoe2PeerSelector("dglev", "test", status)
708         storage_index = encoder.get_param("storage_index")
709         share_size = encoder.get_param("share_size")
710         block_size = encoder.get_param("block_size")
711         num_segments = encoder.get_param("num_segments")
712         d = selector.get_shareholders(broker, sh, storage_index,
713                                       share_size, block_size, num_segments,
714                                       10, 3, 4)
715         def _have_shareholders((used_peers, already_peers)):
716             assert servers_to_break <= len(used_peers)
717             for index in xrange(servers_to_break):
718                 server = list(used_peers)[index]
719                 for share in server.buckets.keys():
720                     server.buckets[share].abort()
721             buckets = {}
722             servermap = already_peers.copy()
723             for peer in used_peers:
724                 buckets.update(peer.buckets)
725                 for bucket in peer.buckets:
726                     servermap.setdefault(bucket, set()).add(peer.peerid)
727             encoder.set_shareholders(buckets, servermap)
728             d = encoder.start()
729             return d
730         d.addCallback(_have_shareholders)
731         return d
732
733
734     def _add_server(self, server_number, readonly=False):
735         assert self.g, "I tried to find a grid at self.g, but failed"
736         ss = self.g.make_server(server_number, readonly)
737         self.g.add_server(server_number, ss)
738
739
740     def _add_server_with_share(self, server_number, share_number=None,
741                                readonly=False):
742         self._add_server(server_number, readonly)
743         if share_number is not None:
744             self._copy_share_to_server(share_number, server_number)
745
746
747     def _copy_share_to_server(self, share_number, server_number):
748         ss = self.g.servers_by_number[server_number]
749         # Copy share i from the directory associated with the first
750         # storage server to the directory associated with this one.
751         assert self.g, "I tried to find a grid at self.g, but failed"
752         assert self.shares, "I tried to find shares at self.shares, but failed"
753         old_share_location = self.shares[share_number][2]
754         new_share_location = os.path.join(ss.storedir, "shares")
755         si = uri.from_string(self.uri).get_storage_index()
756         new_share_location = os.path.join(new_share_location,
757                                           storage_index_to_dir(si))
758         if not os.path.exists(new_share_location):
759             os.makedirs(new_share_location)
760         new_share_location = os.path.join(new_share_location,
761                                           str(share_number))
762         if old_share_location != new_share_location:
763             shutil.copy(old_share_location, new_share_location)
764         shares = self.find_shares(self.uri)
765         # Make sure that the storage server has the share.
766         self.failUnless((share_number, ss.my_nodeid, new_share_location)
767                         in shares)
768
769     def _setup_grid(self):
770         """
771         I set up a NoNetworkGrid with a single server and client.
772         """
773         self.set_up_grid(num_clients=1, num_servers=1)
774
775     def _setup_and_upload(self, **kwargs):
776         """
777         I set up a NoNetworkGrid with a single server and client,
778         upload a file to it, store its uri in self.uri, and store its
779         sharedata in self.shares.
780         """
781         self._setup_grid()
782         client = self.g.clients[0]
783         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
784         if "n" in kwargs and "k" in kwargs:
785             client.DEFAULT_ENCODING_PARAMETERS['k'] = kwargs['k']
786             client.DEFAULT_ENCODING_PARAMETERS['n'] = kwargs['n']
787         data = upload.Data("data" * 10000, convergence="")
788         self.data = data
789         d = client.upload(data)
790         def _store_uri(ur):
791             self.uri = ur.uri
792         d.addCallback(_store_uri)
793         d.addCallback(lambda ign:
794             self.find_shares(self.uri))
795         def _store_shares(shares):
796             self.shares = shares
797         d.addCallback(_store_shares)
798         return d
799
800
801     def test_configure_parameters(self):
802         self.basedir = self.mktemp()
803         hooks = {0: self._set_up_nodes_extra_config}
804         self.set_up_grid(client_config_hooks=hooks)
805         c0 = self.g.clients[0]
806
807         DATA = "data" * 100
808         u = upload.Data(DATA, convergence="")
809         d = c0.upload(u)
810         d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
811         m = monitor.Monitor()
812         d.addCallback(lambda fn: fn.check(m))
813         def _check(cr):
814             data = cr.get_data()
815             self.failUnlessEqual(data["count-shares-needed"], 7)
816             self.failUnlessEqual(data["count-shares-expected"], 12)
817         d.addCallback(_check)
818         return d
819
820
821     def _setUp(self, ns):
822         # Used by test_happy_semantics and test_preexisting_share_behavior
823         # to set up the grid.
824         self.node = FakeClient(mode="good", num_servers=ns)
825         self.u = upload.Uploader()
826         self.u.running = True
827         self.u.parent = self.node
828
829
830     def test_happy_semantics(self):
831         self._setUp(2)
832         DATA = upload.Data("kittens" * 10000, convergence="")
833         # These parameters are unsatisfiable with only 2 servers.
834         self.set_encoding_parameters(k=3, happy=5, n=10)
835         d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
836                             "shares could be placed or found on only 2 "
837                             "server(s). We were asked to place shares on "
838                             "at least 5 server(s) such that any 3 of them "
839                             "have enough shares to recover the file",
840                             self.u.upload, DATA)
841         # Let's reset the client to have 10 servers
842         d.addCallback(lambda ign:
843             self._setUp(10))
844         # These parameters are satisfiable with 10 servers.
845         d.addCallback(lambda ign:
846             self.set_encoding_parameters(k=3, happy=5, n=10))
847         d.addCallback(lambda ign:
848             self.u.upload(DATA))
849         # Let's reset the client to have 7 servers
850         # (this is less than n, but more than h)
851         d.addCallback(lambda ign:
852             self._setUp(7))
853         # These parameters are satisfiable with 7 servers.
854         d.addCallback(lambda ign:
855             self.set_encoding_parameters(k=3, happy=5, n=10))
856         d.addCallback(lambda ign:
857             self.u.upload(DATA))
858         return d
859
860     def test_aborted_shares(self):
861         self.basedir = "upload/EncodingParameters/aborted_shares"
862         self.set_up_grid(num_servers=4)
863         c = self.g.clients[0]
864         DATA = upload.Data(100* "kittens", convergence="")
865         # These parameters are unsatisfiable with only 4 servers, but should
866         # work with 5, as long as the original 4 are not stuck in the open
867         # BucketWriter state (open() but not
868         parms = {"k":2, "happy":5, "n":5, "max_segment_size": 1*MiB}
869         c.DEFAULT_ENCODING_PARAMETERS = parms
870         d = self.shouldFail(UploadUnhappinessError, "test_aborted_shares",
871                             "shares could be placed on only 4 "
872                             "server(s) such that any 2 of them have enough "
873                             "shares to recover the file, but we were asked "
874                             "to place shares on at least 5 such servers",
875                             c.upload, DATA)
876         # now add the 5th server
877         d.addCallback(lambda ign: self._add_server(4, False))
878         # and this time the upload ought to succeed
879         d.addCallback(lambda ign: c.upload(DATA))
880         return d
881
882
883     def test_problem_layout_comment_52(self):
884         def _basedir():
885             self.basedir = self.mktemp()
886         _basedir()
887         # This scenario is at
888         # http://allmydata.org/trac/tahoe/ticket/778#comment:52
889         #
890         # The scenario in comment:52 proposes that we have a layout
891         # like:
892         # server 0: shares 1 - 9
893         # server 1: share 0, read-only
894         # server 2: share 0, read-only
895         # server 3: share 0, read-only
896         # To get access to the shares, we will first upload to one
897         # server, which will then have shares 0 - 9. We'll then
898         # add three new servers, configure them to not accept any new
899         # shares, then write share 0 directly into the serverdir of each,
900         # and then remove share 0 from server 0 in the same way.
901         # Then each of servers 1 - 3 will report that they have share 0,
902         # and will not accept any new share, while server 0 will report that
903         # it has shares 1 - 9 and will accept new shares.
904         # We'll then set 'happy' = 4, and see that an upload fails
905         # (as it should)
906         d = self._setup_and_upload()
907         d.addCallback(lambda ign:
908             self._add_server_with_share(server_number=1, share_number=0,
909                                         readonly=True))
910         d.addCallback(lambda ign:
911             self._add_server_with_share(server_number=2, share_number=0,
912                                         readonly=True))
913         d.addCallback(lambda ign:
914             self._add_server_with_share(server_number=3, share_number=0,
915                                         readonly=True))
916         # Remove the first share from server 0.
917         def _remove_share_0_from_server_0():
918             share_location = self.shares[0][2]
919             os.remove(share_location)
920         d.addCallback(lambda ign:
921             _remove_share_0_from_server_0())
922         # Set happy = 4 in the client.
923         def _prepare():
924             client = self.g.clients[0]
925             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
926             return client
927         d.addCallback(lambda ign:
928             _prepare())
929         # Uploading data should fail
930         d.addCallback(lambda client:
931             self.shouldFail(UploadUnhappinessError,
932                             "test_problem_layout_comment_52_test_1",
933                             "shares could be placed or found on 4 server(s), "
934                             "but they are not spread out evenly enough to "
935                             "ensure that any 3 of these servers would have "
936                             "enough shares to recover the file. "
937                             "We were asked to place shares on at "
938                             "least 4 servers such that any 3 of them have "
939                             "enough shares to recover the file",
940                             client.upload, upload.Data("data" * 10000,
941                                                        convergence="")))
942
943         # Do comment:52, but like this:
944         # server 2: empty
945         # server 3: share 0, read-only
946         # server 1: share 0, read-only
947         # server 0: shares 0-9
948         d.addCallback(lambda ign:
949             _basedir())
950         d.addCallback(lambda ign:
951             self._setup_and_upload())
952         d.addCallback(lambda ign:
953             self._add_server(server_number=2))
954         d.addCallback(lambda ign:
955             self._add_server_with_share(server_number=3, share_number=0,
956                                         readonly=True))
957         d.addCallback(lambda ign:
958             self._add_server_with_share(server_number=1, share_number=0,
959                                         readonly=True))
960         def _prepare2():
961             client = self.g.clients[0]
962             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
963             return client
964         d.addCallback(lambda ign:
965             _prepare2())
966         d.addCallback(lambda client:
967             self.shouldFail(UploadUnhappinessError,
968                             "test_problem_layout_comment_52_test_2",
969                             "shares could be placed on only 3 server(s) such "
970                             "that any 3 of them have enough shares to recover "
971                             "the file, but we were asked to place shares on "
972                             "at least 4 such servers.",
973                             client.upload, upload.Data("data" * 10000,
974                                                        convergence="")))
975         return d
976
977
978     def test_problem_layout_comment_53(self):
979         # This scenario is at
980         # http://allmydata.org/trac/tahoe/ticket/778#comment:53
981         #
982         # Set up the grid to have one server
983         def _change_basedir(ign):
984             self.basedir = self.mktemp()
985         _change_basedir(None)
986         # We start by uploading all of the shares to one server.
987         # Next, we'll add three new servers to our NoNetworkGrid. We'll add
988         # one share from our initial upload to each of these.
989         # The counterintuitive ordering of the share numbers is to deal with
990         # the permuting of these servers -- distributing the shares this
991         # way ensures that the Tahoe2PeerSelector sees them in the order
992         # described below.
993         d = self._setup_and_upload()
994         d.addCallback(lambda ign:
995             self._add_server_with_share(server_number=1, share_number=2))
996         d.addCallback(lambda ign:
997             self._add_server_with_share(server_number=2, share_number=0))
998         d.addCallback(lambda ign:
999             self._add_server_with_share(server_number=3, share_number=1))
1000         # So, we now have the following layout:
1001         # server 0: shares 0 - 9
1002         # server 1: share 2
1003         # server 2: share 0
1004         # server 3: share 1
1005         # We change the 'happy' parameter in the client to 4.
1006         # The Tahoe2PeerSelector will see the peers permuted as:
1007         # 2, 3, 1, 0
1008         # Ideally, a reupload of our original data should work.
1009         def _reset_encoding_parameters(ign, happy=4):
1010             client = self.g.clients[0]
1011             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1012             return client
1013         d.addCallback(_reset_encoding_parameters)
1014         d.addCallback(lambda client:
1015             client.upload(upload.Data("data" * 10000, convergence="")))
1016
1017
1018         # This scenario is basically comment:53, but changed so that the
1019         # Tahoe2PeerSelector sees the server with all of the shares before
1020         # any of the other servers.
1021         # The layout is:
1022         # server 2: shares 0 - 9
1023         # server 3: share 0
1024         # server 1: share 1
1025         # server 4: share 2
1026         # The Tahoe2PeerSelector sees the peers permuted as:
1027         # 2, 3, 1, 4
1028         # Note that server 0 has been replaced by server 4; this makes it
1029         # easier to ensure that the last server seen by Tahoe2PeerSelector
1030         # has only one share.
1031         d.addCallback(_change_basedir)
1032         d.addCallback(lambda ign:
1033             self._setup_and_upload())
1034         d.addCallback(lambda ign:
1035             self._add_server_with_share(server_number=2, share_number=0))
1036         d.addCallback(lambda ign:
1037             self._add_server_with_share(server_number=3, share_number=1))
1038         d.addCallback(lambda ign:
1039             self._add_server_with_share(server_number=1, share_number=2))
1040         # Copy all of the other shares to server number 2
1041         def _copy_shares(ign):
1042             for i in xrange(0, 10):
1043                 self._copy_share_to_server(i, 2)
1044         d.addCallback(_copy_shares)
1045         # Remove the first server, and add a placeholder with share 0
1046         d.addCallback(lambda ign:
1047             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1048         d.addCallback(lambda ign:
1049             self._add_server_with_share(server_number=4, share_number=0))
1050         # Now try uploading.
1051         d.addCallback(_reset_encoding_parameters)
1052         d.addCallback(lambda client:
1053             client.upload(upload.Data("data" * 10000, convergence="")))
1054
1055
1056         # Try the same thing, but with empty servers after the first one
1057         # We want to make sure that Tahoe2PeerSelector will redistribute
1058         # shares as necessary, not simply discover an existing layout.
1059         # The layout is:
1060         # server 2: shares 0 - 9
1061         # server 3: empty
1062         # server 1: empty
1063         # server 4: empty
1064         d.addCallback(_change_basedir)
1065         d.addCallback(lambda ign:
1066             self._setup_and_upload())
1067         d.addCallback(lambda ign:
1068             self._add_server(server_number=2))
1069         d.addCallback(lambda ign:
1070             self._add_server(server_number=3))
1071         d.addCallback(lambda ign:
1072             self._add_server(server_number=1))
1073         d.addCallback(lambda ign:
1074             self._add_server(server_number=4))
1075         d.addCallback(_copy_shares)
1076         d.addCallback(lambda ign:
1077             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1078         d.addCallback(_reset_encoding_parameters)
1079         d.addCallback(lambda client:
1080             client.upload(upload.Data("data" * 10000, convergence="")))
1081         # Make sure that only as many shares as necessary to satisfy
1082         # servers of happiness were pushed.
1083         d.addCallback(lambda results:
1084             self.failUnlessEqual(results.pushed_shares, 3))
1085         return d
1086
1087
1088     def test_happiness_with_some_readonly_peers(self):
1089         # Try the following layout
1090         # server 2: shares 0-9
1091         # server 4: share 0, read-only
1092         # server 3: share 1, read-only
1093         # server 1: share 2, read-only
1094         self.basedir = self.mktemp()
1095         d = self._setup_and_upload()
1096         d.addCallback(lambda ign:
1097             self._add_server_with_share(server_number=2, share_number=0))
1098         d.addCallback(lambda ign:
1099             self._add_server_with_share(server_number=3, share_number=1,
1100                                         readonly=True))
1101         d.addCallback(lambda ign:
1102             self._add_server_with_share(server_number=1, share_number=2,
1103                                         readonly=True))
1104         # Copy all of the other shares to server number 2
1105         def _copy_shares(ign):
1106             for i in xrange(1, 10):
1107                 self._copy_share_to_server(i, 2)
1108         d.addCallback(_copy_shares)
1109         # Remove server 0, and add another in its place
1110         d.addCallback(lambda ign:
1111             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1112         d.addCallback(lambda ign:
1113             self._add_server_with_share(server_number=4, share_number=0,
1114                                         readonly=True))
1115         def _reset_encoding_parameters(ign, happy=4):
1116             client = self.g.clients[0]
1117             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1118             return client
1119         d.addCallback(_reset_encoding_parameters)
1120         d.addCallback(lambda client:
1121             client.upload(upload.Data("data" * 10000, convergence="")))
1122         return d
1123
1124
1125     def test_happiness_with_all_readonly_peers(self):
1126         # server 3: share 1, read-only
1127         # server 1: share 2, read-only
1128         # server 2: shares 0-9, read-only
1129         # server 4: share 0, read-only
1130         # The idea with this test is to make sure that the survey of
1131         # read-only peers doesn't undercount servers of happiness
1132         self.basedir = self.mktemp()
1133         d = self._setup_and_upload()
1134         d.addCallback(lambda ign:
1135             self._add_server_with_share(server_number=4, share_number=0,
1136                                         readonly=True))
1137         d.addCallback(lambda ign:
1138             self._add_server_with_share(server_number=3, share_number=1,
1139                                         readonly=True))
1140         d.addCallback(lambda ign:
1141             self._add_server_with_share(server_number=1, share_number=2,
1142                                         readonly=True))
1143         d.addCallback(lambda ign:
1144             self._add_server_with_share(server_number=2, share_number=0,
1145                                         readonly=True))
1146         def _copy_shares(ign):
1147             for i in xrange(1, 10):
1148                 self._copy_share_to_server(i, 2)
1149         d.addCallback(_copy_shares)
1150         d.addCallback(lambda ign:
1151             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1152         def _reset_encoding_parameters(ign, happy=4):
1153             client = self.g.clients[0]
1154             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1155             return client
1156         d.addCallback(_reset_encoding_parameters)
1157         d.addCallback(lambda client:
1158             client.upload(upload.Data("data" * 10000, convergence="")))
1159         return d
1160
1161
1162     def test_dropped_servers_in_encoder(self):
1163         # The Encoder does its own "servers_of_happiness" check if it
1164         # happens to lose a bucket during an upload (it assumes that
1165         # the layout presented to it satisfies "servers_of_happiness"
1166         # until a failure occurs)
1167         #
1168         # This test simulates an upload where servers break after peer
1169         # selection, but before they are written to.
1170         def _set_basedir(ign=None):
1171             self.basedir = self.mktemp()
1172         _set_basedir()
1173         d = self._setup_and_upload();
1174         # Add 5 servers
1175         def _do_server_setup(ign):
1176             self._add_server(server_number=1)
1177             self._add_server(server_number=2)
1178             self._add_server(server_number=3)
1179             self._add_server(server_number=4)
1180             self._add_server(server_number=5)
1181         d.addCallback(_do_server_setup)
1182         # remove the original server
1183         # (necessary to ensure that the Tahoe2PeerSelector will distribute
1184         #  all the shares)
1185         def _remove_server(ign):
1186             server = self.g.servers_by_number[0]
1187             self.g.remove_server(server.my_nodeid)
1188         d.addCallback(_remove_server)
1189         # This should succeed; we still have 4 servers, and the
1190         # happiness of the upload is 4.
1191         d.addCallback(lambda ign:
1192             self._do_upload_with_broken_servers(1))
1193         # Now, do the same thing over again, but drop 2 servers instead
1194         # of 1. This should fail, because servers_of_happiness is 4 and
1195         # we can't satisfy that.
1196         d.addCallback(_set_basedir)
1197         d.addCallback(lambda ign:
1198             self._setup_and_upload())
1199         d.addCallback(_do_server_setup)
1200         d.addCallback(_remove_server)
1201         d.addCallback(lambda ign:
1202             self.shouldFail(UploadUnhappinessError,
1203                             "test_dropped_servers_in_encoder",
1204                             "shares could be placed on only 3 server(s) "
1205                             "such that any 3 of them have enough shares to "
1206                             "recover the file, but we were asked to place "
1207                             "shares on at least 4",
1208                             self._do_upload_with_broken_servers, 2))
1209         # Now do the same thing over again, but make some of the servers
1210         # readonly, break some of the ones that aren't, and make sure that
1211         # happiness accounting is preserved.
1212         d.addCallback(_set_basedir)
1213         d.addCallback(lambda ign:
1214             self._setup_and_upload())
1215         def _do_server_setup_2(ign):
1216             self._add_server(1)
1217             self._add_server(2)
1218             self._add_server(3)
1219             self._add_server_with_share(4, 7, readonly=True)
1220             self._add_server_with_share(5, 8, readonly=True)
1221         d.addCallback(_do_server_setup_2)
1222         d.addCallback(_remove_server)
1223         d.addCallback(lambda ign:
1224             self._do_upload_with_broken_servers(1))
1225         d.addCallback(_set_basedir)
1226         d.addCallback(lambda ign:
1227             self._setup_and_upload())
1228         d.addCallback(_do_server_setup_2)
1229         d.addCallback(_remove_server)
1230         d.addCallback(lambda ign:
1231             self.shouldFail(UploadUnhappinessError,
1232                             "test_dropped_servers_in_encoder",
1233                             "shares could be placed on only 3 server(s) "
1234                             "such that any 3 of them have enough shares to "
1235                             "recover the file, but we were asked to place "
1236                             "shares on at least 4",
1237                             self._do_upload_with_broken_servers, 2))
1238         return d
1239
1240
1241     def test_merge_peers(self):
1242         # merge_peers merges a list of used_peers and a dict of
1243         # shareid -> peerid mappings.
1244         shares = {
1245                     1 : set(["server1"]),
1246                     2 : set(["server2"]),
1247                     3 : set(["server3"]),
1248                     4 : set(["server4", "server5"]),
1249                     5 : set(["server1", "server2"]),
1250                  }
1251         # if not provided with a used_peers argument, it should just
1252         # return the first argument unchanged.
1253         self.failUnlessEqual(shares, merge_peers(shares, set([])))
1254         class FakePeerTracker:
1255             pass
1256         trackers = []
1257         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1258             t = FakePeerTracker()
1259             t.peerid = server
1260             t.buckets = [i]
1261             trackers.append(t)
1262         expected = {
1263                     1 : set(["server1"]),
1264                     2 : set(["server2"]),
1265                     3 : set(["server3"]),
1266                     4 : set(["server4", "server5"]),
1267                     5 : set(["server1", "server2", "server5"]),
1268                     6 : set(["server6"]),
1269                     7 : set(["server7"]),
1270                     8 : set(["server8"]),
1271                    }
1272         self.failUnlessEqual(expected, merge_peers(shares, set(trackers)))
1273         shares2 = {}
1274         expected = {
1275                     5 : set(["server5"]),
1276                     6 : set(["server6"]),
1277                     7 : set(["server7"]),
1278                     8 : set(["server8"]),
1279                    }
1280         self.failUnlessEqual(expected, merge_peers(shares2, set(trackers)))
1281         shares3 = {}
1282         trackers = []
1283         expected = {}
1284         for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1285             shares3[i] = set([server])
1286             t = FakePeerTracker()
1287             t.peerid = server
1288             t.buckets = [i]
1289             trackers.append(t)
1290             expected[i] = set([server])
1291         self.failUnlessEqual(expected, merge_peers(shares3, set(trackers)))
1292
1293
1294     def test_servers_of_happiness_utility_function(self):
1295         # These tests are concerned with the servers_of_happiness()
1296         # utility function, and its underlying matching algorithm. Other
1297         # aspects of the servers_of_happiness behavior are tested
1298         # elsehwere These tests exist to ensure that
1299         # servers_of_happiness doesn't under or overcount the happiness
1300         # value for given inputs.
1301
1302         # servers_of_happiness expects a dict of
1303         # shnum => set(peerids) as a preexisting shares argument.
1304         test1 = {
1305                  1 : set(["server1"]),
1306                  2 : set(["server2"]),
1307                  3 : set(["server3"]),
1308                  4 : set(["server4"])
1309                 }
1310         happy = servers_of_happiness(test1)
1311         self.failUnlessEqual(4, happy)
1312         test1[4] = set(["server1"])
1313         # We've added a duplicate server, so now servers_of_happiness
1314         # should be 3 instead of 4.
1315         happy = servers_of_happiness(test1)
1316         self.failUnlessEqual(3, happy)
1317         # The second argument of merge_peers should be a set of
1318         # objects with peerid and buckets as attributes. In actual use,
1319         # these will be PeerTracker instances, but for testing it is fine
1320         # to make a FakePeerTracker whose job is to hold those instance
1321         # variables to test that part.
1322         class FakePeerTracker:
1323             pass
1324         trackers = []
1325         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1326             t = FakePeerTracker()
1327             t.peerid = server
1328             t.buckets = [i]
1329             trackers.append(t)
1330         # Recall that test1 is a server layout with servers_of_happiness
1331         # = 3.  Since there isn't any overlap between the shnum ->
1332         # set([peerid]) correspondences in test1 and those in trackers,
1333         # the result here should be 7.
1334         test2 = merge_peers(test1, set(trackers))
1335         happy = servers_of_happiness(test2)
1336         self.failUnlessEqual(7, happy)
1337         # Now add an overlapping server to trackers. This is redundant,
1338         # so it should not cause the previously reported happiness value
1339         # to change.
1340         t = FakePeerTracker()
1341         t.peerid = "server1"
1342         t.buckets = [1]
1343         trackers.append(t)
1344         test2 = merge_peers(test1, set(trackers))
1345         happy = servers_of_happiness(test2)
1346         self.failUnlessEqual(7, happy)
1347         test = {}
1348         happy = servers_of_happiness(test)
1349         self.failUnlessEqual(0, happy)
1350         # Test a more substantial overlap between the trackers and the
1351         # existing assignments.
1352         test = {
1353             1 : set(['server1']),
1354             2 : set(['server2']),
1355             3 : set(['server3']),
1356             4 : set(['server4']),
1357         }
1358         trackers = []
1359         t = FakePeerTracker()
1360         t.peerid = 'server5'
1361         t.buckets = [4]
1362         trackers.append(t)
1363         t = FakePeerTracker()
1364         t.peerid = 'server6'
1365         t.buckets = [3, 5]
1366         trackers.append(t)
1367         # The value returned by servers_of_happiness is the size
1368         # of a maximum matching in the bipartite graph that
1369         # servers_of_happiness() makes between peerids and share
1370         # numbers. It should find something like this:
1371         # (server 1, share 1)
1372         # (server 2, share 2)
1373         # (server 3, share 3)
1374         # (server 5, share 4)
1375         # (server 6, share 5)
1376         #
1377         # and, since there are 5 edges in this matching, it should
1378         # return 5.
1379         test2 = merge_peers(test, set(trackers))
1380         happy = servers_of_happiness(test2)
1381         self.failUnlessEqual(5, happy)
1382         # Zooko's first puzzle:
1383         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1384         #
1385         # server 1: shares 0, 1
1386         # server 2: shares 1, 2
1387         # server 3: share 2
1388         #
1389         # This should yield happiness of 3.
1390         test = {
1391             0 : set(['server1']),
1392             1 : set(['server1', 'server2']),
1393             2 : set(['server2', 'server3']),
1394         }
1395         self.failUnlessEqual(3, servers_of_happiness(test))
1396         # Zooko's second puzzle:
1397         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1398         #
1399         # server 1: shares 0, 1
1400         # server 2: share 1
1401         #
1402         # This should yield happiness of 2.
1403         test = {
1404             0 : set(['server1']),
1405             1 : set(['server1', 'server2']),
1406         }
1407         self.failUnlessEqual(2, servers_of_happiness(test))
1408
1409
1410     def test_shares_by_server(self):
1411         test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1412         sbs = shares_by_server(test)
1413         self.failUnlessEqual(set([1]), sbs["server1"])
1414         self.failUnlessEqual(set([2]), sbs["server2"])
1415         self.failUnlessEqual(set([3]), sbs["server3"])
1416         self.failUnlessEqual(set([4]), sbs["server4"])
1417         test1 = {
1418                     1 : set(["server1"]),
1419                     2 : set(["server1"]),
1420                     3 : set(["server1"]),
1421                     4 : set(["server2"]),
1422                     5 : set(["server2"])
1423                 }
1424         sbs = shares_by_server(test1)
1425         self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1426         self.failUnlessEqual(set([4, 5]), sbs["server2"])
1427         # This should fail unless the peerid part of the mapping is a set
1428         test2 = {1: "server1"}
1429         self.shouldFail(AssertionError,
1430                        "test_shares_by_server",
1431                        "",
1432                        shares_by_server, test2)
1433
1434
1435     def test_existing_share_detection(self):
1436         self.basedir = self.mktemp()
1437         d = self._setup_and_upload()
1438         # Our final setup should look like this:
1439         # server 1: shares 0 - 9, read-only
1440         # server 2: empty
1441         # server 3: empty
1442         # server 4: empty
1443         # The purpose of this test is to make sure that the peer selector
1444         # knows about the shares on server 1, even though it is read-only.
1445         # It used to simply filter these out, which would cause the test
1446         # to fail when servers_of_happiness = 4.
1447         d.addCallback(lambda ign:
1448             self._add_server_with_share(1, 0, True))
1449         d.addCallback(lambda ign:
1450             self._add_server(2))
1451         d.addCallback(lambda ign:
1452             self._add_server(3))
1453         d.addCallback(lambda ign:
1454             self._add_server(4))
1455         def _copy_shares(ign):
1456             for i in xrange(1, 10):
1457                 self._copy_share_to_server(i, 1)
1458         d.addCallback(_copy_shares)
1459         d.addCallback(lambda ign:
1460             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1461         def _prepare_client(ign):
1462             client = self.g.clients[0]
1463             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1464             return client
1465         d.addCallback(_prepare_client)
1466         d.addCallback(lambda client:
1467             client.upload(upload.Data("data" * 10000, convergence="")))
1468         return d
1469
1470
1471     def test_query_counting(self):
1472         # If peer selection fails, Tahoe2PeerSelector prints out a lot
1473         # of helpful diagnostic information, including query stats.
1474         # This test helps make sure that that information is accurate.
1475         self.basedir = self.mktemp()
1476         d = self._setup_and_upload()
1477         def _setup(ign):
1478             for i in xrange(1, 11):
1479                 self._add_server(server_number=i)
1480             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1481             c = self.g.clients[0]
1482             # We set happy to an unsatisfiable value so that we can check the
1483             # counting in the exception message. The same progress message
1484             # is also used when the upload is successful, but in that case it
1485             # only gets written to a log, so we can't see what it says.
1486             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1487             return c
1488         d.addCallback(_setup)
1489         d.addCallback(lambda c:
1490             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1491                             "10 queries placed some shares",
1492                             c.upload, upload.Data("data" * 10000,
1493                                                   convergence="")))
1494         # Now try with some readonly servers. We want to make sure that
1495         # the readonly peer share discovery phase is counted correctly.
1496         def _reset(ign):
1497             self.basedir = self.mktemp()
1498             self.g = None
1499         d.addCallback(_reset)
1500         d.addCallback(lambda ign:
1501             self._setup_and_upload())
1502         def _then(ign):
1503             for i in xrange(1, 11):
1504                 self._add_server(server_number=i)
1505             self._add_server(server_number=11, readonly=True)
1506             self._add_server(server_number=12, readonly=True)
1507             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1508             c = self.g.clients[0]
1509             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1510             return c
1511         d.addCallback(_then)
1512         d.addCallback(lambda c:
1513             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1514                             "2 placed none (of which 2 placed none due to "
1515                             "the server being full",
1516                             c.upload, upload.Data("data" * 10000,
1517                                                   convergence="")))
1518         # Now try the case where the upload process finds a bunch of the
1519         # shares that it wants to place on the first server, including
1520         # the one that it wanted to allocate there. Though no shares will
1521         # be allocated in this request, it should still be called
1522         # productive, since it caused some homeless shares to be
1523         # removed.
1524         d.addCallback(_reset)
1525         d.addCallback(lambda ign:
1526             self._setup_and_upload())
1527
1528         def _next(ign):
1529             for i in xrange(1, 11):
1530                 self._add_server(server_number=i)
1531             # Copy all of the shares to server 9, since that will be
1532             # the first one that the selector sees.
1533             for i in xrange(10):
1534                 self._copy_share_to_server(i, 9)
1535             # Remove server 0, and its contents
1536             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1537             # Make happiness unsatisfiable
1538             c = self.g.clients[0]
1539             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1540             return c
1541         d.addCallback(_next)
1542         d.addCallback(lambda c:
1543             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1544                             "1 queries placed some shares",
1545                             c.upload, upload.Data("data" * 10000,
1546                                                   convergence="")))
1547         return d
1548
1549
1550     def test_upper_limit_on_readonly_queries(self):
1551         self.basedir = self.mktemp()
1552         d = self._setup_and_upload()
1553         def _then(ign):
1554             for i in xrange(1, 11):
1555                 self._add_server(server_number=i, readonly=True)
1556             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1557             c = self.g.clients[0]
1558             c.DEFAULT_ENCODING_PARAMETERS['k'] = 2
1559             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1560             c.DEFAULT_ENCODING_PARAMETERS['n'] = 4
1561             return c
1562         d.addCallback(_then)
1563         d.addCallback(lambda client:
1564             self.shouldFail(UploadUnhappinessError,
1565                             "test_upper_limit_on_readonly_queries",
1566                             "sent 8 queries to 8 peers",
1567                             client.upload,
1568                             upload.Data('data' * 10000, convergence="")))
1569         return d
1570
1571
1572     def test_exception_messages_during_peer_selection(self):
1573         # server 1: read-only, no shares
1574         # server 2: read-only, no shares
1575         # server 3: read-only, no shares
1576         # server 4: read-only, no shares
1577         # server 5: read-only, no shares
1578         # This will fail, but we want to make sure that the log messages
1579         # are informative about why it has failed.
1580         self.basedir = self.mktemp()
1581         d = self._setup_and_upload()
1582         d.addCallback(lambda ign:
1583             self._add_server(server_number=1, readonly=True))
1584         d.addCallback(lambda ign:
1585             self._add_server(server_number=2, readonly=True))
1586         d.addCallback(lambda ign:
1587             self._add_server(server_number=3, readonly=True))
1588         d.addCallback(lambda ign:
1589             self._add_server(server_number=4, readonly=True))
1590         d.addCallback(lambda ign:
1591             self._add_server(server_number=5, readonly=True))
1592         d.addCallback(lambda ign:
1593             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1594         def _reset_encoding_parameters(ign, happy=4):
1595             client = self.g.clients[0]
1596             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1597             return client
1598         d.addCallback(_reset_encoding_parameters)
1599         d.addCallback(lambda client:
1600             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1601                             "placed 0 shares out of 10 "
1602                             "total (10 homeless), want to place shares on at "
1603                             "least 4 servers such that any 3 of them have "
1604                             "enough shares to recover the file, "
1605                             "sent 5 queries to 5 peers, 0 queries placed "
1606                             "some shares, 5 placed none "
1607                             "(of which 5 placed none due to the server being "
1608                             "full and 0 placed none due to an error)",
1609                             client.upload,
1610                             upload.Data("data" * 10000, convergence="")))
1611
1612
1613         # server 1: read-only, no shares
1614         # server 2: broken, no shares
1615         # server 3: read-only, no shares
1616         # server 4: read-only, no shares
1617         # server 5: read-only, no shares
1618         def _reset(ign):
1619             self.basedir = self.mktemp()
1620         d.addCallback(_reset)
1621         d.addCallback(lambda ign:
1622             self._setup_and_upload())
1623         d.addCallback(lambda ign:
1624             self._add_server(server_number=1, readonly=True))
1625         d.addCallback(lambda ign:
1626             self._add_server(server_number=2))
1627         def _break_server_2(ign):
1628             server = self.g.servers_by_number[2].my_nodeid
1629             # We have to break the server in servers_by_id,
1630             # because the one in servers_by_number isn't wrapped,
1631             # and doesn't look at its broken attribute when answering
1632             # queries.
1633             self.g.servers_by_id[server].broken = True
1634         d.addCallback(_break_server_2)
1635         d.addCallback(lambda ign:
1636             self._add_server(server_number=3, readonly=True))
1637         d.addCallback(lambda ign:
1638             self._add_server(server_number=4, readonly=True))
1639         d.addCallback(lambda ign:
1640             self._add_server(server_number=5, readonly=True))
1641         d.addCallback(lambda ign:
1642             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1643         d.addCallback(_reset_encoding_parameters)
1644         d.addCallback(lambda client:
1645             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1646                             "placed 0 shares out of 10 "
1647                             "total (10 homeless), want to place shares on at "
1648                             "least 4 servers such that any 3 of them have "
1649                             "enough shares to recover the file, "
1650                             "sent 5 queries to 5 peers, 0 queries placed "
1651                             "some shares, 5 placed none "
1652                             "(of which 4 placed none due to the server being "
1653                             "full and 1 placed none due to an error)",
1654                             client.upload,
1655                             upload.Data("data" * 10000, convergence="")))
1656         # server 0, server 1 = empty, accepting shares
1657         # This should place all of the shares, but still fail with happy=4.
1658         # We want to make sure that the exception message is worded correctly.
1659         d.addCallback(_reset)
1660         d.addCallback(lambda ign:
1661             self._setup_grid())
1662         d.addCallback(lambda ign:
1663             self._add_server(server_number=1))
1664         d.addCallback(_reset_encoding_parameters)
1665         d.addCallback(lambda client:
1666             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1667                             "shares could be placed or found on only 2 "
1668                             "server(s). We were asked to place shares on at "
1669                             "least 4 server(s) such that any 3 of them have "
1670                             "enough shares to recover the file.",
1671                             client.upload, upload.Data("data" * 10000,
1672                                                        convergence="")))
1673         # servers 0 - 4 = empty, accepting shares
1674         # This too should place all the shares, and this too should fail,
1675         # but since the effective happiness is more than the k encoding
1676         # parameter, it should trigger a different error message than the one
1677         # above.
1678         d.addCallback(_reset)
1679         d.addCallback(lambda ign:
1680             self._setup_grid())
1681         d.addCallback(lambda ign:
1682             self._add_server(server_number=1))
1683         d.addCallback(lambda ign:
1684             self._add_server(server_number=2))
1685         d.addCallback(lambda ign:
1686             self._add_server(server_number=3))
1687         d.addCallback(lambda ign:
1688             self._add_server(server_number=4))
1689         d.addCallback(_reset_encoding_parameters, happy=7)
1690         d.addCallback(lambda client:
1691             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1692                             "shares could be placed on only 5 server(s) such "
1693                             "that any 3 of them have enough shares to recover "
1694                             "the file, but we were asked to place shares on "
1695                             "at least 7 such servers.",
1696                             client.upload, upload.Data("data" * 10000,
1697                                                        convergence="")))
1698         # server 0: shares 0 - 9
1699         # server 1: share 0, read-only
1700         # server 2: share 0, read-only
1701         # server 3: share 0, read-only
1702         # This should place all of the shares, but fail with happy=4.
1703         # Since the number of servers with shares is more than the number
1704         # necessary to reconstitute the file, this will trigger a different
1705         # error message than either of those above.
1706         d.addCallback(_reset)
1707         d.addCallback(lambda ign:
1708             self._setup_and_upload())
1709         d.addCallback(lambda ign:
1710             self._add_server_with_share(server_number=1, share_number=0,
1711                                         readonly=True))
1712         d.addCallback(lambda ign:
1713             self._add_server_with_share(server_number=2, share_number=0,
1714                                         readonly=True))
1715         d.addCallback(lambda ign:
1716             self._add_server_with_share(server_number=3, share_number=0,
1717                                         readonly=True))
1718         d.addCallback(_reset_encoding_parameters, happy=7)
1719         d.addCallback(lambda client:
1720             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1721                             "shares could be placed or found on 4 server(s), "
1722                             "but they are not spread out evenly enough to "
1723                             "ensure that any 3 of these servers would have "
1724                             "enough shares to recover the file. We were asked "
1725                             "to place shares on at least 7 servers such that "
1726                             "any 3 of them have enough shares to recover the "
1727                             "file",
1728                             client.upload, upload.Data("data" * 10000,
1729                                                        convergence="")))
1730         return d
1731
1732
1733     def test_problem_layout_comment_187(self):
1734         # #778 comment 187 broke an initial attempt at a share
1735         # redistribution algorithm. This test is here to demonstrate the
1736         # breakage, and to test that subsequent algorithms don't also
1737         # break in the same way.
1738         self.basedir = self.mktemp()
1739         d = self._setup_and_upload(k=2, n=3)
1740
1741         # server 1: shares 0, 1, 2, readonly
1742         # server 2: share 0, readonly
1743         # server 3: share 0
1744         def _setup(ign):
1745             self._add_server_with_share(server_number=1, share_number=0,
1746                                         readonly=True)
1747             self._add_server_with_share(server_number=2, share_number=0,
1748                                         readonly=True)
1749             self._add_server_with_share(server_number=3, share_number=0)
1750             # Copy shares
1751             self._copy_share_to_server(1, 1)
1752             self._copy_share_to_server(2, 1)
1753             # Remove server 0
1754             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1755             client = self.g.clients[0]
1756             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
1757             return client
1758
1759         d.addCallback(_setup)
1760         d.addCallback(lambda client:
1761             client.upload(upload.Data("data" * 10000, convergence="")))
1762         return d
1763     test_problem_layout_comment_187.todo = "this isn't fixed yet"
1764
1765
1766     def test_upload_succeeds_with_some_homeless_shares(self):
1767         # If the upload is forced to stop trying to place shares before
1768         # it has placed (or otherwise accounted) for all of them, but it
1769         # has placed enough to satisfy the upload health criteria that
1770         # we're using, it should still succeed.
1771         self.basedir = self.mktemp()
1772         d = self._setup_and_upload()
1773         def _server_setup(ign):
1774             # Add four servers so that we have a layout like this:
1775             # server 1: share 0, read-only
1776             # server 2: share 1, read-only
1777             # server 3: share 2, read-only
1778             # server 4: share 3, read-only
1779             # If we set happy = 4, the upload will manage to satisfy
1780             # servers of happiness, but not place all of the shares; we
1781             # want to test that the upload is declared successful in
1782             # this case.
1783             self._add_server_with_share(server_number=1, share_number=0,
1784                                         readonly=True)
1785             self._add_server_with_share(server_number=2, share_number=1,
1786                                         readonly=True)
1787             self._add_server_with_share(server_number=3, share_number=2,
1788                                         readonly=True)
1789             self._add_server_with_share(server_number=4, share_number=3,
1790                                         readonly=True)
1791             # Remove server 0.
1792             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1793             # Set the client appropriately
1794             c = self.g.clients[0]
1795             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1796             return c
1797         d.addCallback(_server_setup)
1798         d.addCallback(lambda client:
1799             client.upload(upload.Data("data" * 10000, convergence="")))
1800         return d
1801
1802
1803     def test_uploader_skips_over_servers_with_only_one_share(self):
1804         # We want to make sure that the redistribution logic ignores
1805         # servers with only one share, since placing these shares
1806         # elsewhere will at best keep happiness the same as it was, and
1807         # at worst hurt it.
1808         self.basedir = self.mktemp()
1809         d = self._setup_and_upload()
1810         def _server_setup(ign):
1811             # Add some servers so that the upload will need to
1812             # redistribute, but will first pass over a couple of servers
1813             # that don't have enough shares to redistribute before
1814             # finding one that does have shares to redistribute.
1815             self._add_server_with_share(server_number=1, share_number=0)
1816             self._add_server_with_share(server_number=2, share_number=2)
1817             self._add_server_with_share(server_number=3, share_number=1)
1818             self._add_server_with_share(server_number=8, share_number=4)
1819             self._add_server_with_share(server_number=5, share_number=5)
1820             self._add_server_with_share(server_number=10, share_number=7)
1821             for i in xrange(4):
1822                 self._copy_share_to_server(i, 2)
1823             return self.g.clients[0]
1824         d.addCallback(_server_setup)
1825         d.addCallback(lambda client:
1826             client.upload(upload.Data("data" * 10000, convergence="")))
1827         return d
1828
1829
1830     def test_peer_selector_bucket_abort(self):
1831         # If peer selection for an upload fails due to an unhappy
1832         # layout, the peer selection process should abort the buckets it
1833         # allocates before failing, so that the space can be re-used.
1834         self.basedir = self.mktemp()
1835         self.set_up_grid(num_servers=5)
1836
1837         # Try to upload a file with happy=7, which is unsatisfiable with
1838         # the current grid. This will fail, but should not take up any
1839         # space on the storage servers after it fails.
1840         client = self.g.clients[0]
1841         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
1842         d = defer.succeed(None)
1843         d.addCallback(lambda ignored:
1844             self.shouldFail(UploadUnhappinessError,
1845                             "test_peer_selection_bucket_abort",
1846                             "",
1847                             client.upload, upload.Data("data" * 10000,
1848                                                        convergence="")))
1849         # wait for the abort messages to get there.
1850         def _turn_barrier(res):
1851             return fireEventually(res)
1852         d.addCallback(_turn_barrier)
1853         def _then(ignored):
1854             for server in self.g.servers_by_number.values():
1855                 self.failUnlessEqual(server.allocated_size(), 0)
1856         d.addCallback(_then)
1857         return d
1858
1859
1860     def test_encoder_bucket_abort(self):
1861         # If enough servers die in the process of encoding and uploading
1862         # a file to make the layout unhappy, we should cancel the
1863         # newly-allocated buckets before dying.
1864         self.basedir = self.mktemp()
1865         self.set_up_grid(num_servers=4)
1866
1867         client = self.g.clients[0]
1868         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 7
1869
1870         d = defer.succeed(None)
1871         d.addCallback(lambda ignored:
1872             self.shouldFail(UploadUnhappinessError,
1873                             "test_encoder_bucket_abort",
1874                             "",
1875                             self._do_upload_with_broken_servers, 1))
1876         def _turn_barrier(res):
1877             return fireEventually(res)
1878         d.addCallback(_turn_barrier)
1879         def _then(ignored):
1880             for server in self.g.servers_by_number.values():
1881                 self.failUnlessEqual(server.allocated_size(), 0)
1882         d.addCallback(_then)
1883         return d
1884
1885
1886     def _set_up_nodes_extra_config(self, clientdir):
1887         cfgfn = os.path.join(clientdir, "tahoe.cfg")
1888         oldcfg = open(cfgfn, "r").read()
1889         f = open(cfgfn, "wt")
1890         f.write(oldcfg)
1891         f.write("\n")
1892         f.write("[client]\n")
1893         f.write("shares.needed = 7\n")
1894         f.write("shares.total = 12\n")
1895         f.write("\n")
1896         f.close()
1897         return None
1898
1899 # TODO:
1900 #  upload with exactly 75 peers (shares_of_happiness)
1901 #  have a download fail
1902 #  cancel a download (need to implement more cancel stuff)