]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
Improve code coverage of the Tahoe2PeerSelector tests.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1
2 import os, shutil
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.python.failure import Failure
6 from twisted.python import log
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
9
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
14 from allmydata.util.assertutil import precondition
15 from allmydata.util.deferredutil import DeferredListShouldSucceed
16 from allmydata.util.happinessutil import servers_of_happiness, \
17                                          shares_by_server, merge_peers
18 from no_network import GridTestMixin
19 from common_util import ShouldFailMixin
20 from allmydata.storage_client import StorageFarmBroker
21 from allmydata.storage.server import storage_index_to_dir
22
23 MiB = 1024*1024
24
25 def extract_uri(results):
26     return results.uri
27
28 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
29 # have been due to an earlier test ERROR'ing out due to timeout, which seems
30 # to screw up subsequent tests.
31 timeout = 960
32
33 class Uploadable(unittest.TestCase):
34     def shouldEqual(self, data, expected):
35         self.failUnless(isinstance(data, list))
36         for e in data:
37             self.failUnless(isinstance(e, str))
38         s = "".join(data)
39         self.failUnlessEqual(s, expected)
40
41     def test_filehandle_random_key(self):
42         return self._test_filehandle(convergence=None)
43
44     def test_filehandle_convergent_encryption(self):
45         return self._test_filehandle(convergence="some convergence string")
46
47     def _test_filehandle(self, convergence):
48         s = StringIO("a"*41)
49         u = upload.FileHandle(s, convergence=convergence)
50         d = u.get_size()
51         d.addCallback(self.failUnlessEqual, 41)
52         d.addCallback(lambda res: u.read(1))
53         d.addCallback(self.shouldEqual, "a")
54         d.addCallback(lambda res: u.read(80))
55         d.addCallback(self.shouldEqual, "a"*40)
56         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
57         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
58         return d
59
60     def test_filename(self):
61         basedir = "upload/Uploadable/test_filename"
62         os.makedirs(basedir)
63         fn = os.path.join(basedir, "file")
64         f = open(fn, "w")
65         f.write("a"*41)
66         f.close()
67         u = upload.FileName(fn, convergence=None)
68         d = u.get_size()
69         d.addCallback(self.failUnlessEqual, 41)
70         d.addCallback(lambda res: u.read(1))
71         d.addCallback(self.shouldEqual, "a")
72         d.addCallback(lambda res: u.read(80))
73         d.addCallback(self.shouldEqual, "a"*40)
74         d.addCallback(lambda res: u.close())
75         return d
76
77     def test_data(self):
78         s = "a"*41
79         u = upload.Data(s, convergence=None)
80         d = u.get_size()
81         d.addCallback(self.failUnlessEqual, 41)
82         d.addCallback(lambda res: u.read(1))
83         d.addCallback(self.shouldEqual, "a")
84         d.addCallback(lambda res: u.read(80))
85         d.addCallback(self.shouldEqual, "a"*40)
86         d.addCallback(lambda res: u.close())
87         return d
88
89 class ServerError(Exception):
90     pass
91
92 class SetDEPMixin:
93     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
94         p = {"k": k,
95              "happy": happy,
96              "n": n,
97              "max_segment_size": max_segsize,
98              }
99         self.node.DEFAULT_ENCODING_PARAMETERS = p
100
101 class FakeStorageServer:
102     def __init__(self, mode):
103         self.mode = mode
104         self.allocated = []
105         self.queries = 0
106         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
107                          { "maximum-immutable-share-size": 2**32 },
108                          "application-version": str(allmydata.__full_version__),
109                          }
110         if mode == "small":
111             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
112                              { "maximum-immutable-share-size": 10 },
113                              "application-version": str(allmydata.__full_version__),
114                              }
115
116
117     def callRemote(self, methname, *args, **kwargs):
118         def _call():
119             meth = getattr(self, methname)
120             return meth(*args, **kwargs)
121         d = fireEventually()
122         d.addCallback(lambda res: _call())
123         return d
124
125     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
126                          sharenums, share_size, canary):
127         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
128         if self.mode == "first-fail":
129             if self.queries == 0:
130                 raise ServerError
131         if self.mode == "second-fail":
132             if self.queries == 1:
133                 raise ServerError
134         self.queries += 1
135         if self.mode == "full":
136             return (set(), {},)
137         elif self.mode == "already got them":
138             return (set(sharenums), {},)
139         else:
140             for shnum in sharenums:
141                 self.allocated.append( (storage_index, shnum) )
142             return (set(),
143                     dict([( shnum, FakeBucketWriter(share_size) )
144                           for shnum in sharenums]),
145                     )
146
147 class FakeBucketWriter:
148     # a diagnostic version of storageserver.BucketWriter
149     def __init__(self, size):
150         self.data = StringIO()
151         self.closed = False
152         self._size = size
153
154     def callRemote(self, methname, *args, **kwargs):
155         def _call():
156             meth = getattr(self, "remote_" + methname)
157             return meth(*args, **kwargs)
158         d = fireEventually()
159         d.addCallback(lambda res: _call())
160         return d
161
162     def remote_write(self, offset, data):
163         precondition(not self.closed)
164         precondition(offset >= 0)
165         precondition(offset+len(data) <= self._size,
166                      "offset=%d + data=%d > size=%d" %
167                      (offset, len(data), self._size))
168         self.data.seek(offset)
169         self.data.write(data)
170
171     def remote_close(self):
172         precondition(not self.closed)
173         self.closed = True
174
175     def remote_abort(self):
176         log.err(RuntimeError("uh oh, I was asked to abort"))
177
178 class FakeClient:
179     DEFAULT_ENCODING_PARAMETERS = {"k":25,
180                                    "happy": 25,
181                                    "n": 100,
182                                    "max_segment_size": 1*MiB,
183                                    }
184     def __init__(self, mode="good", num_servers=50):
185         self.num_servers = num_servers
186         if type(mode) is str:
187             mode = dict([i,mode] for i in range(num_servers))
188         peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
189                   for fakeid in range(self.num_servers) ]
190         self.storage_broker = StorageFarmBroker(None, permute_peers=True)
191         for (serverid, server) in peers:
192             self.storage_broker.test_add_server(serverid, server)
193         self.last_peers = [p[1] for p in peers]
194
195     def log(self, *args, **kwargs):
196         pass
197     def get_encoding_parameters(self):
198         return self.DEFAULT_ENCODING_PARAMETERS
199     def get_storage_broker(self):
200         return self.storage_broker
201     _secret_holder = client.SecretHolder("lease secret", "convergence secret")
202
203 class GotTooFarError(Exception):
204     pass
205
206 class GiganticUploadable(upload.FileHandle):
207     def __init__(self, size):
208         self._size = size
209         self._fp = 0
210
211     def get_encryption_key(self):
212         return defer.succeed("\x00" * 16)
213     def get_size(self):
214         return defer.succeed(self._size)
215     def read(self, length):
216         left = self._size - self._fp
217         length = min(left, length)
218         self._fp += length
219         if self._fp > 1000000:
220             # terminate the test early.
221             raise GotTooFarError("we shouldn't be allowed to get this far")
222         return defer.succeed(["\x00" * length])
223     def close(self):
224         pass
225
226 DATA = """
227 Once upon a time, there was a beautiful princess named Buttercup. She lived
228 in a magical land where every file was stored securely among millions of
229 machines, and nobody ever worried about their data being lost ever again.
230 The End.
231 """
232 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
233
234 SIZE_ZERO = 0
235 SIZE_SMALL = 16
236 SIZE_LARGE = len(DATA)
237
238 def upload_data(uploader, data):
239     u = upload.Data(data, convergence=None)
240     return uploader.upload(u)
241 def upload_filename(uploader, filename):
242     u = upload.FileName(filename, convergence=None)
243     return uploader.upload(u)
244 def upload_filehandle(uploader, fh):
245     u = upload.FileHandle(fh, convergence=None)
246     return uploader.upload(u)
247
248 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
249     def setUp(self):
250         self.node = FakeClient(mode="good")
251         self.u = upload.Uploader()
252         self.u.running = True
253         self.u.parent = self.node
254
255     def _check_small(self, newuri, size):
256         u = uri.from_string(newuri)
257         self.failUnless(isinstance(u, uri.LiteralFileURI))
258         self.failUnlessEqual(len(u.data), size)
259
260     def _check_large(self, newuri, size):
261         u = uri.from_string(newuri)
262         self.failUnless(isinstance(u, uri.CHKFileURI))
263         self.failUnless(isinstance(u.get_storage_index(), str))
264         self.failUnlessEqual(len(u.get_storage_index()), 16)
265         self.failUnless(isinstance(u.key, str))
266         self.failUnlessEqual(len(u.key), 16)
267         self.failUnlessEqual(u.size, size)
268
269     def get_data(self, size):
270         return DATA[:size]
271
272     def test_too_large(self):
273         # we've removed the 4GiB share size limit (see ticket #346 for
274         # details), but still have an 8-byte field, so the limit is now
275         # 2**64, so make sure we reject files larger than that.
276         k = 3; happy = 7; n = 10
277         self.set_encoding_parameters(k, happy, n)
278         big = k*(2**64)
279         data1 = GiganticUploadable(big)
280         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
281                             "This file is too large to be uploaded (data_size)",
282                             self.u.upload, data1)
283         data2 = GiganticUploadable(big-3)
284         d.addCallback(lambda res:
285                       self.shouldFail(FileTooLargeError,
286                                       "test_too_large-data2",
287                                       "This file is too large to be uploaded (offsets)",
288                                       self.u.upload, data2))
289         # I don't know where the actual limit is.. it depends upon how large
290         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
291         return d
292
293     def test_data_zero(self):
294         data = self.get_data(SIZE_ZERO)
295         d = upload_data(self.u, data)
296         d.addCallback(extract_uri)
297         d.addCallback(self._check_small, SIZE_ZERO)
298         return d
299
300     def test_data_small(self):
301         data = self.get_data(SIZE_SMALL)
302         d = upload_data(self.u, data)
303         d.addCallback(extract_uri)
304         d.addCallback(self._check_small, SIZE_SMALL)
305         return d
306
307     def test_data_large(self):
308         data = self.get_data(SIZE_LARGE)
309         d = upload_data(self.u, data)
310         d.addCallback(extract_uri)
311         d.addCallback(self._check_large, SIZE_LARGE)
312         return d
313
314     def test_data_large_odd_segments(self):
315         data = self.get_data(SIZE_LARGE)
316         segsize = int(SIZE_LARGE / 2.5)
317         # we want 3 segments, since that's not a power of two
318         self.set_encoding_parameters(25, 25, 100, segsize)
319         d = upload_data(self.u, data)
320         d.addCallback(extract_uri)
321         d.addCallback(self._check_large, SIZE_LARGE)
322         return d
323
324     def test_filehandle_zero(self):
325         data = self.get_data(SIZE_ZERO)
326         d = upload_filehandle(self.u, StringIO(data))
327         d.addCallback(extract_uri)
328         d.addCallback(self._check_small, SIZE_ZERO)
329         return d
330
331     def test_filehandle_small(self):
332         data = self.get_data(SIZE_SMALL)
333         d = upload_filehandle(self.u, StringIO(data))
334         d.addCallback(extract_uri)
335         d.addCallback(self._check_small, SIZE_SMALL)
336         return d
337
338     def test_filehandle_large(self):
339         data = self.get_data(SIZE_LARGE)
340         d = upload_filehandle(self.u, StringIO(data))
341         d.addCallback(extract_uri)
342         d.addCallback(self._check_large, SIZE_LARGE)
343         return d
344
345     def test_filename_zero(self):
346         fn = "Uploader-test_filename_zero.data"
347         f = open(fn, "wb")
348         data = self.get_data(SIZE_ZERO)
349         f.write(data)
350         f.close()
351         d = upload_filename(self.u, fn)
352         d.addCallback(extract_uri)
353         d.addCallback(self._check_small, SIZE_ZERO)
354         return d
355
356     def test_filename_small(self):
357         fn = "Uploader-test_filename_small.data"
358         f = open(fn, "wb")
359         data = self.get_data(SIZE_SMALL)
360         f.write(data)
361         f.close()
362         d = upload_filename(self.u, fn)
363         d.addCallback(extract_uri)
364         d.addCallback(self._check_small, SIZE_SMALL)
365         return d
366
367     def test_filename_large(self):
368         fn = "Uploader-test_filename_large.data"
369         f = open(fn, "wb")
370         data = self.get_data(SIZE_LARGE)
371         f.write(data)
372         f.close()
373         d = upload_filename(self.u, fn)
374         d.addCallback(extract_uri)
375         d.addCallback(self._check_large, SIZE_LARGE)
376         return d
377
378 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
379     def make_node(self, mode, num_servers=10):
380         self.node = FakeClient(mode, num_servers)
381         self.u = upload.Uploader()
382         self.u.running = True
383         self.u.parent = self.node
384
385     def _check_large(self, newuri, size):
386         u = uri.from_string(newuri)
387         self.failUnless(isinstance(u, uri.CHKFileURI))
388         self.failUnless(isinstance(u.get_storage_index(), str))
389         self.failUnlessEqual(len(u.get_storage_index()), 16)
390         self.failUnless(isinstance(u.key, str))
391         self.failUnlessEqual(len(u.key), 16)
392         self.failUnlessEqual(u.size, size)
393
394     def test_first_error(self):
395         mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
396         self.make_node(mode)
397         self.set_encoding_parameters(k=25, happy=1, n=50)
398         d = upload_data(self.u, DATA)
399         d.addCallback(extract_uri)
400         d.addCallback(self._check_large, SIZE_LARGE)
401         return d
402
403     def test_first_error_all(self):
404         self.make_node("first-fail")
405         d = self.shouldFail(UploadUnhappinessError, "first_error_all",
406                             "peer selection failed",
407                             upload_data, self.u, DATA)
408         def _check((f,)):
409             self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
410             # there should also be a 'last failure was' message
411             self.failUnlessIn("ServerError", str(f.value))
412         d.addCallback(_check)
413         return d
414
415     def test_second_error(self):
416         # we want to make sure we make it to a third pass. This means that
417         # the first pass was insufficient to place all shares, and at least
418         # one of second pass servers (other than the last one) accepted a
419         # share (so we'll believe that a third pass will be useful). (if
420         # everyone but the last server throws an error, then we'll send all
421         # the remaining shares to the last server at the end of the second
422         # pass, and if that succeeds, we won't make it to a third pass).
423         #
424         # we can achieve this 97.5% of the time by using 40 servers, having
425         # 39 of them fail on the second request, leaving only one to succeed
426         # on the second request. (we need to keep the number of servers low
427         # enough to ensure a second pass with 100 shares).
428         mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
429         self.make_node(mode, 40)
430         d = upload_data(self.u, DATA)
431         d.addCallback(extract_uri)
432         d.addCallback(self._check_large, SIZE_LARGE)
433         return d
434
435     def test_second_error_all(self):
436         self.make_node("second-fail")
437         d = self.shouldFail(UploadUnhappinessError, "second_error_all",
438                             "peer selection failed",
439                             upload_data, self.u, DATA)
440         def _check((f,)):
441             self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
442             # there should also be a 'last failure was' message
443             self.failUnlessIn("ServerError", str(f.value))
444         d.addCallback(_check)
445         return d
446
447 class FullServer(unittest.TestCase):
448     def setUp(self):
449         self.node = FakeClient(mode="full")
450         self.u = upload.Uploader()
451         self.u.running = True
452         self.u.parent = self.node
453
454     def _should_fail(self, f):
455         self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
456
457     def test_data_large(self):
458         data = DATA
459         d = upload_data(self.u, data)
460         d.addBoth(self._should_fail)
461         return d
462
463 class PeerSelection(unittest.TestCase):
464
465     def make_client(self, num_servers=50):
466         self.node = FakeClient(mode="good", num_servers=num_servers)
467         self.u = upload.Uploader()
468         self.u.running = True
469         self.u.parent = self.node
470
471     def get_data(self, size):
472         return DATA[:size]
473
474     def _check_large(self, newuri, size):
475         u = uri.from_string(newuri)
476         self.failUnless(isinstance(u, uri.CHKFileURI))
477         self.failUnless(isinstance(u.get_storage_index(), str))
478         self.failUnlessEqual(len(u.get_storage_index()), 16)
479         self.failUnless(isinstance(u.key, str))
480         self.failUnlessEqual(len(u.key), 16)
481         self.failUnlessEqual(u.size, size)
482
483     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
484         p = {"k": k,
485              "happy": happy,
486              "n": n,
487              "max_segment_size": max_segsize,
488              }
489         self.node.DEFAULT_ENCODING_PARAMETERS = p
490
491     def test_one_each(self):
492         # if we have 50 shares, and there are 50 peers, and they all accept a
493         # share, we should get exactly one share per peer
494
495         self.make_client()
496         data = self.get_data(SIZE_LARGE)
497         self.set_encoding_parameters(25, 30, 50)
498         d = upload_data(self.u, data)
499         d.addCallback(extract_uri)
500         d.addCallback(self._check_large, SIZE_LARGE)
501         def _check(res):
502             for p in self.node.last_peers:
503                 allocated = p.allocated
504                 self.failUnlessEqual(len(allocated), 1)
505                 self.failUnlessEqual(p.queries, 1)
506         d.addCallback(_check)
507         return d
508
509     def test_two_each(self):
510         # if we have 100 shares, and there are 50 peers, and they all accept
511         # all shares, we should get exactly two shares per peer
512
513         self.make_client()
514         data = self.get_data(SIZE_LARGE)
515         # if there are 50 peers, then happy needs to be <= 50
516         self.set_encoding_parameters(50, 50, 100)
517         d = upload_data(self.u, data)
518         d.addCallback(extract_uri)
519         d.addCallback(self._check_large, SIZE_LARGE)
520         def _check(res):
521             for p in self.node.last_peers:
522                 allocated = p.allocated
523                 self.failUnlessEqual(len(allocated), 2)
524                 self.failUnlessEqual(p.queries, 2)
525         d.addCallback(_check)
526         return d
527
528     def test_one_each_plus_one_extra(self):
529         # if we have 51 shares, and there are 50 peers, then one peer gets
530         # two shares and the rest get just one
531
532         self.make_client()
533         data = self.get_data(SIZE_LARGE)
534         self.set_encoding_parameters(24, 41, 51)
535         d = upload_data(self.u, data)
536         d.addCallback(extract_uri)
537         d.addCallback(self._check_large, SIZE_LARGE)
538         def _check(res):
539             got_one = []
540             got_two = []
541             for p in self.node.last_peers:
542                 allocated = p.allocated
543                 self.failUnless(len(allocated) in (1,2), len(allocated))
544                 if len(allocated) == 1:
545                     self.failUnlessEqual(p.queries, 1)
546                     got_one.append(p)
547                 else:
548                     self.failUnlessEqual(p.queries, 2)
549                     got_two.append(p)
550             self.failUnlessEqual(len(got_one), 49)
551             self.failUnlessEqual(len(got_two), 1)
552         d.addCallback(_check)
553         return d
554
555     def test_four_each(self):
556         # if we have 200 shares, and there are 50 peers, then each peer gets
557         # 4 shares. The design goal is to accomplish this with only two
558         # queries per peer.
559
560         self.make_client()
561         data = self.get_data(SIZE_LARGE)
562         # if there are 50 peers, then happy should be no more than 50 if
563         # we want this to work.
564         self.set_encoding_parameters(100, 50, 200)
565         d = upload_data(self.u, data)
566         d.addCallback(extract_uri)
567         d.addCallback(self._check_large, SIZE_LARGE)
568         def _check(res):
569             for p in self.node.last_peers:
570                 allocated = p.allocated
571                 self.failUnlessEqual(len(allocated), 4)
572                 self.failUnlessEqual(p.queries, 2)
573         d.addCallback(_check)
574         return d
575
576     def test_three_of_ten(self):
577         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
578         # 4+4+2
579
580         self.make_client(3)
581         data = self.get_data(SIZE_LARGE)
582         self.set_encoding_parameters(3, 3, 10)
583         d = upload_data(self.u, data)
584         d.addCallback(extract_uri)
585         d.addCallback(self._check_large, SIZE_LARGE)
586         def _check(res):
587             counts = {}
588             for p in self.node.last_peers:
589                 allocated = p.allocated
590                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
591             histogram = [counts.get(i, 0) for i in range(5)]
592             self.failUnlessEqual(histogram, [0,0,0,2,1])
593         d.addCallback(_check)
594         return d
595
596     def test_some_big_some_small(self):
597         # 10 shares, 20 servers, but half the servers don't support a
598         # share-size large enough for our file
599         mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
600         self.node = FakeClient(mode, num_servers=20)
601         self.u = upload.Uploader()
602         self.u.running = True
603         self.u.parent = self.node
604
605         data = self.get_data(SIZE_LARGE)
606         self.set_encoding_parameters(3, 5, 10)
607         d = upload_data(self.u, data)
608         d.addCallback(extract_uri)
609         d.addCallback(self._check_large, SIZE_LARGE)
610         def _check(res):
611             # we should have put one share each on the big peers, and zero
612             # shares on the small peers
613             total_allocated = 0
614             for p in self.node.last_peers:
615                 if p.mode == "good":
616                     self.failUnlessEqual(len(p.allocated), 1)
617                 elif p.mode == "small":
618                     self.failUnlessEqual(len(p.allocated), 0)
619                 total_allocated += len(p.allocated)
620             self.failUnlessEqual(total_allocated, 10)
621         d.addCallback(_check)
622         return d
623
624
625 class StorageIndex(unittest.TestCase):
626     def test_params_must_matter(self):
627         DATA = "I am some data"
628         u = upload.Data(DATA, convergence="")
629         eu = upload.EncryptAnUploadable(u)
630         d1 = eu.get_storage_index()
631
632         # CHK means the same data should encrypt the same way
633         u = upload.Data(DATA, convergence="")
634         eu = upload.EncryptAnUploadable(u)
635         d1a = eu.get_storage_index()
636
637         # but if we use a different convergence string it should be different
638         u = upload.Data(DATA, convergence="wheee!")
639         eu = upload.EncryptAnUploadable(u)
640         d1salt1 = eu.get_storage_index()
641
642         # and if we add yet a different convergence it should be different again
643         u = upload.Data(DATA, convergence="NOT wheee!")
644         eu = upload.EncryptAnUploadable(u)
645         d1salt2 = eu.get_storage_index()
646
647         # and if we use the first string again it should be the same as last time
648         u = upload.Data(DATA, convergence="wheee!")
649         eu = upload.EncryptAnUploadable(u)
650         d1salt1a = eu.get_storage_index()
651
652         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
653         u = upload.Data(DATA, convergence="")
654         u.encoding_param_k = u.default_encoding_param_k + 1
655         eu = upload.EncryptAnUploadable(u)
656         d2 = eu.get_storage_index()
657
658         # and if we use a random key, it should be different than the CHK
659         u = upload.Data(DATA, convergence=None)
660         eu = upload.EncryptAnUploadable(u)
661         d3 = eu.get_storage_index()
662         # and different from another instance
663         u = upload.Data(DATA, convergence=None)
664         eu = upload.EncryptAnUploadable(u)
665         d4 = eu.get_storage_index()
666
667         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
668         def _done(res):
669             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
670             self.failUnlessEqual(si1, si1a)
671             self.failIfEqual(si1, si2)
672             self.failIfEqual(si1, si3)
673             self.failIfEqual(si1, si4)
674             self.failIfEqual(si3, si4)
675             self.failIfEqual(si1salt1, si1)
676             self.failIfEqual(si1salt1, si1salt2)
677             self.failIfEqual(si1salt2, si1)
678             self.failUnlessEqual(si1salt1, si1salt1a)
679         d.addCallback(_done)
680         return d
681
682 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
683     ShouldFailMixin):
684     def _do_upload_with_broken_servers(self, servers_to_break):
685         """
686         I act like a normal upload, but before I send the results of
687         Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
688         PeerTrackers in the used_peers part of the return result.
689         """
690         assert self.g, "I tried to find a grid at self.g, but failed"
691         broker = self.g.clients[0].storage_broker
692         sh     = self.g.clients[0]._secret_holder
693         data = upload.Data("data" * 10000, convergence="")
694         data.encoding_param_k = 3
695         data.encoding_param_happy = 4
696         data.encoding_param_n = 10
697         uploadable = upload.EncryptAnUploadable(data)
698         encoder = encode.Encoder()
699         encoder.set_encrypted_uploadable(uploadable)
700         status = upload.UploadStatus()
701         selector = upload.Tahoe2PeerSelector("dglev", "test", status)
702         storage_index = encoder.get_param("storage_index")
703         share_size = encoder.get_param("share_size")
704         block_size = encoder.get_param("block_size")
705         num_segments = encoder.get_param("num_segments")
706         d = selector.get_shareholders(broker, sh, storage_index,
707                                       share_size, block_size, num_segments,
708                                       10, 3, 4)
709         def _have_shareholders((used_peers, already_peers)):
710             assert servers_to_break <= len(used_peers)
711             for index in xrange(servers_to_break):
712                 server = list(used_peers)[index]
713                 for share in server.buckets.keys():
714                     server.buckets[share].abort()
715             buckets = {}
716             servermap = already_peers.copy()
717             for peer in used_peers:
718                 buckets.update(peer.buckets)
719                 for bucket in peer.buckets:
720                     servermap.setdefault(bucket, set()).add(peer.peerid)
721             encoder.set_shareholders(buckets, servermap)
722             d = encoder.start()
723             return d
724         d.addCallback(_have_shareholders)
725         return d
726
727
728     def _add_server(self, server_number, readonly=False):
729         assert self.g, "I tried to find a grid at self.g, but failed"
730         ss = self.g.make_server(server_number, readonly)
731         self.g.add_server(server_number, ss)
732
733
734     def _add_server_with_share(self, server_number, share_number=None,
735                                readonly=False):
736         self._add_server(server_number, readonly)
737         if share_number is not None:
738             self._copy_share_to_server(share_number, server_number)
739
740
741     def _copy_share_to_server(self, share_number, server_number):
742         ss = self.g.servers_by_number[server_number]
743         # Copy share i from the directory associated with the first 
744         # storage server to the directory associated with this one.
745         assert self.g, "I tried to find a grid at self.g, but failed"
746         assert self.shares, "I tried to find shares at self.shares, but failed"
747         old_share_location = self.shares[share_number][2]
748         new_share_location = os.path.join(ss.storedir, "shares")
749         si = uri.from_string(self.uri).get_storage_index()
750         new_share_location = os.path.join(new_share_location,
751                                           storage_index_to_dir(si))
752         if not os.path.exists(new_share_location):
753             os.makedirs(new_share_location)
754         new_share_location = os.path.join(new_share_location,
755                                           str(share_number))
756         if old_share_location != new_share_location:
757             shutil.copy(old_share_location, new_share_location)
758         shares = self.find_shares(self.uri)
759         # Make sure that the storage server has the share.
760         self.failUnless((share_number, ss.my_nodeid, new_share_location)
761                         in shares)
762
763     def _setup_grid(self):
764         """
765         I set up a NoNetworkGrid with a single server and client.
766         """
767         self.set_up_grid(num_clients=1, num_servers=1)
768
769     def _setup_and_upload(self, **kwargs):
770         """
771         I set up a NoNetworkGrid with a single server and client,
772         upload a file to it, store its uri in self.uri, and store its
773         sharedata in self.shares.
774         """
775         self._setup_grid()
776         client = self.g.clients[0]
777         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
778         if "n" in kwargs and "k" in kwargs:
779             client.DEFAULT_ENCODING_PARAMETERS['k'] = kwargs['k']
780             client.DEFAULT_ENCODING_PARAMETERS['n'] = kwargs['n']
781         data = upload.Data("data" * 10000, convergence="")
782         self.data = data
783         d = client.upload(data)
784         def _store_uri(ur):
785             self.uri = ur.uri
786         d.addCallback(_store_uri)
787         d.addCallback(lambda ign:
788             self.find_shares(self.uri))
789         def _store_shares(shares):
790             self.shares = shares
791         d.addCallback(_store_shares)
792         return d
793
794
795     def test_configure_parameters(self):
796         self.basedir = self.mktemp()
797         hooks = {0: self._set_up_nodes_extra_config}
798         self.set_up_grid(client_config_hooks=hooks)
799         c0 = self.g.clients[0]
800
801         DATA = "data" * 100
802         u = upload.Data(DATA, convergence="")
803         d = c0.upload(u)
804         d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
805         m = monitor.Monitor()
806         d.addCallback(lambda fn: fn.check(m))
807         def _check(cr):
808             data = cr.get_data()
809             self.failUnlessEqual(data["count-shares-needed"], 7)
810             self.failUnlessEqual(data["count-shares-expected"], 12)
811         d.addCallback(_check)
812         return d
813
814
815     def _setUp(self, ns):
816         # Used by test_happy_semantics and test_preexisting_share_behavior
817         # to set up the grid.
818         self.node = FakeClient(mode="good", num_servers=ns)
819         self.u = upload.Uploader()
820         self.u.running = True
821         self.u.parent = self.node
822
823
824     def test_happy_semantics(self):
825         self._setUp(2)
826         DATA = upload.Data("kittens" * 10000, convergence="")
827         # These parameters are unsatisfiable with only 2 servers.
828         self.set_encoding_parameters(k=3, happy=5, n=10)
829         d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
830                             "shares could be placed or found on only 2 "
831                             "server(s). We were asked to place shares on "
832                             "at least 5 server(s) such that any 3 of them "
833                             "have enough shares to recover the file",
834                             self.u.upload, DATA)
835         # Let's reset the client to have 10 servers
836         d.addCallback(lambda ign:
837             self._setUp(10))
838         # These parameters are satisfiable with 10 servers.
839         d.addCallback(lambda ign:
840             self.set_encoding_parameters(k=3, happy=5, n=10))
841         d.addCallback(lambda ign:
842             self.u.upload(DATA))
843         # Let's reset the client to have 7 servers
844         # (this is less than n, but more than h)
845         d.addCallback(lambda ign:
846             self._setUp(7))
847         # These parameters are satisfiable with 7 servers.
848         d.addCallback(lambda ign:
849             self.set_encoding_parameters(k=3, happy=5, n=10))
850         d.addCallback(lambda ign:
851             self.u.upload(DATA))
852         return d
853
854
855     def test_problem_layout_comment_52(self):
856         def _basedir():
857             self.basedir = self.mktemp()
858         _basedir()
859         # This scenario is at 
860         # http://allmydata.org/trac/tahoe/ticket/778#comment:52
861         #
862         # The scenario in comment:52 proposes that we have a layout
863         # like:
864         # server 0: shares 1 - 9
865         # server 1: share 0, read-only
866         # server 2: share 0, read-only
867         # server 3: share 0, read-only
868         # To get access to the shares, we will first upload to one 
869         # server, which will then have shares 0 - 9. We'll then 
870         # add three new servers, configure them to not accept any new
871         # shares, then write share 0 directly into the serverdir of each,
872         # and then remove share 0 from server 0 in the same way.
873         # Then each of servers 1 - 3 will report that they have share 0, 
874         # and will not accept any new share, while server 0 will report that
875         # it has shares 1 - 9 and will accept new shares.
876         # We'll then set 'happy' = 4, and see that an upload fails
877         # (as it should)
878         d = self._setup_and_upload()
879         d.addCallback(lambda ign:
880             self._add_server_with_share(server_number=1, share_number=0,
881                                         readonly=True))
882         d.addCallback(lambda ign:
883             self._add_server_with_share(server_number=2, share_number=0,
884                                         readonly=True))
885         d.addCallback(lambda ign:
886             self._add_server_with_share(server_number=3, share_number=0,
887                                         readonly=True))
888         # Remove the first share from server 0.
889         def _remove_share_0_from_server_0():
890             share_location = self.shares[0][2]
891             os.remove(share_location)
892         d.addCallback(lambda ign:
893             _remove_share_0_from_server_0())
894         # Set happy = 4 in the client.
895         def _prepare():
896             client = self.g.clients[0]
897             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
898             return client
899         d.addCallback(lambda ign:
900             _prepare())
901         # Uploading data should fail
902         d.addCallback(lambda client:
903             self.shouldFail(UploadUnhappinessError,
904                             "test_problem_layout_comment_52_test_1",
905                             "shares could be placed or found on 4 server(s), "
906                             "but they are not spread out evenly enough to "
907                             "ensure that any 3 of these servers would have "
908                             "enough shares to recover the file. "
909                             "We were asked to place shares on at "
910                             "least 4 servers such that any 3 of them have "
911                             "enough shares to recover the file",
912                             client.upload, upload.Data("data" * 10000,
913                                                        convergence="")))
914
915         # Do comment:52, but like this:
916         # server 2: empty
917         # server 3: share 0, read-only
918         # server 1: share 0, read-only
919         # server 0: shares 0-9
920         d.addCallback(lambda ign:
921             _basedir())
922         d.addCallback(lambda ign:
923             self._setup_and_upload())
924         d.addCallback(lambda ign:
925             self._add_server(server_number=2))
926         d.addCallback(lambda ign:
927             self._add_server_with_share(server_number=3, share_number=0,
928                                         readonly=True))
929         d.addCallback(lambda ign:
930             self._add_server_with_share(server_number=1, share_number=0,
931                                         readonly=True))
932         def _prepare2():
933             client = self.g.clients[0]
934             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
935             return client
936         d.addCallback(lambda ign:
937             _prepare2())
938         d.addCallback(lambda client:
939             self.shouldFail(UploadUnhappinessError,
940                             "test_problem_layout_comment_52_test_2",
941                             "shares could be placed on only 3 server(s) such "
942                             "that any 3 of them have enough shares to recover "
943                             "the file, but we were asked to place shares on "
944                             "at least 4 such servers.",
945                             client.upload, upload.Data("data" * 10000,
946                                                        convergence="")))
947         return d
948
949
950     def test_problem_layout_comment_53(self):
951         # This scenario is at
952         # http://allmydata.org/trac/tahoe/ticket/778#comment:53
953         #
954         # Set up the grid to have one server
955         def _change_basedir(ign):
956             self.basedir = self.mktemp()
957         _change_basedir(None)
958         # We start by uploading all of the shares to one server.
959         # Next, we'll add three new servers to our NoNetworkGrid. We'll add
960         # one share from our initial upload to each of these.
961         # The counterintuitive ordering of the share numbers is to deal with 
962         # the permuting of these servers -- distributing the shares this 
963         # way ensures that the Tahoe2PeerSelector sees them in the order 
964         # described below.
965         d = self._setup_and_upload()
966         d.addCallback(lambda ign:
967             self._add_server_with_share(server_number=1, share_number=2))
968         d.addCallback(lambda ign:
969             self._add_server_with_share(server_number=2, share_number=0))
970         d.addCallback(lambda ign:
971             self._add_server_with_share(server_number=3, share_number=1))
972         # So, we now have the following layout:
973         # server 0: shares 0 - 9
974         # server 1: share 2
975         # server 2: share 0
976         # server 3: share 1
977         # We change the 'happy' parameter in the client to 4. 
978         # The Tahoe2PeerSelector will see the peers permuted as:
979         # 2, 3, 1, 0
980         # Ideally, a reupload of our original data should work.
981         def _reset_encoding_parameters(ign, happy=4):
982             client = self.g.clients[0]
983             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
984             return client
985         d.addCallback(_reset_encoding_parameters)
986         d.addCallback(lambda client:
987             client.upload(upload.Data("data" * 10000, convergence="")))
988
989
990         # This scenario is basically comment:53, but changed so that the 
991         # Tahoe2PeerSelector sees the server with all of the shares before
992         # any of the other servers.
993         # The layout is:
994         # server 2: shares 0 - 9
995         # server 3: share 0 
996         # server 1: share 1 
997         # server 4: share 2
998         # The Tahoe2PeerSelector sees the peers permuted as:
999         # 2, 3, 1, 4
1000         # Note that server 0 has been replaced by server 4; this makes it 
1001         # easier to ensure that the last server seen by Tahoe2PeerSelector
1002         # has only one share. 
1003         d.addCallback(_change_basedir)
1004         d.addCallback(lambda ign:
1005             self._setup_and_upload())
1006         d.addCallback(lambda ign:
1007             self._add_server_with_share(server_number=2, share_number=0))
1008         d.addCallback(lambda ign:
1009             self._add_server_with_share(server_number=3, share_number=1))
1010         d.addCallback(lambda ign:
1011             self._add_server_with_share(server_number=1, share_number=2))
1012         # Copy all of the other shares to server number 2
1013         def _copy_shares(ign):
1014             for i in xrange(0, 10):
1015                 self._copy_share_to_server(i, 2)
1016         d.addCallback(_copy_shares)
1017         # Remove the first server, and add a placeholder with share 0
1018         d.addCallback(lambda ign:
1019             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1020         d.addCallback(lambda ign:
1021             self._add_server_with_share(server_number=4, share_number=0))
1022         # Now try uploading. 
1023         d.addCallback(_reset_encoding_parameters)
1024         d.addCallback(lambda client:
1025             client.upload(upload.Data("data" * 10000, convergence="")))
1026
1027
1028         # Try the same thing, but with empty servers after the first one
1029         # We want to make sure that Tahoe2PeerSelector will redistribute
1030         # shares as necessary, not simply discover an existing layout.
1031         # The layout is:
1032         # server 2: shares 0 - 9
1033         # server 3: empty
1034         # server 1: empty
1035         # server 4: empty
1036         d.addCallback(_change_basedir)
1037         d.addCallback(lambda ign:
1038             self._setup_and_upload())
1039         d.addCallback(lambda ign:
1040             self._add_server(server_number=2))
1041         d.addCallback(lambda ign:
1042             self._add_server(server_number=3))
1043         d.addCallback(lambda ign:
1044             self._add_server(server_number=1))
1045         d.addCallback(lambda ign:
1046             self._add_server(server_number=4))
1047         d.addCallback(_copy_shares)
1048         d.addCallback(lambda ign:
1049             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1050         d.addCallback(_reset_encoding_parameters)
1051         d.addCallback(lambda client:
1052             client.upload(upload.Data("data" * 10000, convergence="")))
1053         # Make sure that only as many shares as necessary to satisfy
1054         # servers of happiness were pushed.
1055         d.addCallback(lambda results:
1056             self.failUnlessEqual(results.pushed_shares, 3))
1057         return d
1058
1059
1060     def test_happiness_with_some_readonly_peers(self):
1061         # Try the following layout
1062         # server 2: shares 0-9
1063         # server 4: share 0, read-only
1064         # server 3: share 1, read-only
1065         # server 1: share 2, read-only
1066         self.basedir = self.mktemp()
1067         d = self._setup_and_upload()
1068         d.addCallback(lambda ign:
1069             self._add_server_with_share(server_number=2, share_number=0))
1070         d.addCallback(lambda ign:
1071             self._add_server_with_share(server_number=3, share_number=1,
1072                                         readonly=True))
1073         d.addCallback(lambda ign:
1074             self._add_server_with_share(server_number=1, share_number=2,
1075                                         readonly=True))
1076         # Copy all of the other shares to server number 2
1077         def _copy_shares(ign):
1078             for i in xrange(1, 10):
1079                 self._copy_share_to_server(i, 2)
1080         d.addCallback(_copy_shares)
1081         # Remove server 0, and add another in its place
1082         d.addCallback(lambda ign:
1083             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1084         d.addCallback(lambda ign:
1085             self._add_server_with_share(server_number=4, share_number=0,
1086                                         readonly=True))
1087         def _reset_encoding_parameters(ign, happy=4):
1088             client = self.g.clients[0]
1089             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1090             return client
1091         d.addCallback(_reset_encoding_parameters)
1092         d.addCallback(lambda client:
1093             client.upload(upload.Data("data" * 10000, convergence="")))
1094         return d
1095
1096
1097     def test_happiness_with_all_readonly_peers(self):
1098         # server 3: share 1, read-only
1099         # server 1: share 2, read-only
1100         # server 2: shares 0-9, read-only
1101         # server 4: share 0, read-only
1102         # The idea with this test is to make sure that the survey of
1103         # read-only peers doesn't undercount servers of happiness
1104         self.basedir = self.mktemp()
1105         d = self._setup_and_upload()
1106         d.addCallback(lambda ign:
1107             self._add_server_with_share(server_number=4, share_number=0,
1108                                         readonly=True))
1109         d.addCallback(lambda ign:
1110             self._add_server_with_share(server_number=3, share_number=1,
1111                                         readonly=True))
1112         d.addCallback(lambda ign:
1113             self._add_server_with_share(server_number=1, share_number=2,
1114                                         readonly=True))
1115         d.addCallback(lambda ign:
1116             self._add_server_with_share(server_number=2, share_number=0,
1117                                         readonly=True))
1118         def _copy_shares(ign):
1119             for i in xrange(1, 10):
1120                 self._copy_share_to_server(i, 2)
1121         d.addCallback(_copy_shares)
1122         d.addCallback(lambda ign:
1123             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1124         def _reset_encoding_parameters(ign, happy=4):
1125             client = self.g.clients[0]
1126             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1127             return client
1128         d.addCallback(_reset_encoding_parameters)
1129         d.addCallback(lambda client:
1130             client.upload(upload.Data("data" * 10000, convergence="")))
1131         return d
1132
1133
1134     def test_dropped_servers_in_encoder(self):
1135         # The Encoder does its own "servers_of_happiness" check if it 
1136         # happens to lose a bucket during an upload (it assumes that 
1137         # the layout presented to it satisfies "servers_of_happiness"
1138         # until a failure occurs)
1139         # 
1140         # This test simulates an upload where servers break after peer
1141         # selection, but before they are written to.
1142         def _set_basedir(ign=None):
1143             self.basedir = self.mktemp()
1144         _set_basedir()
1145         d = self._setup_and_upload();
1146         # Add 5 servers
1147         def _do_server_setup(ign):
1148             self._add_server(server_number=1)
1149             self._add_server(server_number=2)
1150             self._add_server(server_number=3)
1151             self._add_server(server_number=4)
1152             self._add_server(server_number=5)
1153         d.addCallback(_do_server_setup)
1154         # remove the original server
1155         # (necessary to ensure that the Tahoe2PeerSelector will distribute
1156         #  all the shares)
1157         def _remove_server(ign):
1158             server = self.g.servers_by_number[0]
1159             self.g.remove_server(server.my_nodeid)
1160         d.addCallback(_remove_server)
1161         # This should succeed; we still have 4 servers, and the 
1162         # happiness of the upload is 4.
1163         d.addCallback(lambda ign:
1164             self._do_upload_with_broken_servers(1))
1165         # Now, do the same thing over again, but drop 2 servers instead
1166         # of 1. This should fail, because servers_of_happiness is 4 and 
1167         # we can't satisfy that.
1168         d.addCallback(_set_basedir)
1169         d.addCallback(lambda ign:
1170             self._setup_and_upload())
1171         d.addCallback(_do_server_setup)
1172         d.addCallback(_remove_server)
1173         d.addCallback(lambda ign:
1174             self.shouldFail(UploadUnhappinessError,
1175                             "test_dropped_servers_in_encoder",
1176                             "shares could be placed on only 3 server(s) "
1177                             "such that any 3 of them have enough shares to "
1178                             "recover the file, but we were asked to place "
1179                             "shares on at least 4",
1180                             self._do_upload_with_broken_servers, 2))
1181         # Now do the same thing over again, but make some of the servers
1182         # readonly, break some of the ones that aren't, and make sure that
1183         # happiness accounting is preserved.
1184         d.addCallback(_set_basedir)
1185         d.addCallback(lambda ign:
1186             self._setup_and_upload())
1187         def _do_server_setup_2(ign):
1188             self._add_server(1)
1189             self._add_server(2)
1190             self._add_server(3)
1191             self._add_server_with_share(4, 7, readonly=True)
1192             self._add_server_with_share(5, 8, readonly=True)
1193         d.addCallback(_do_server_setup_2)
1194         d.addCallback(_remove_server)
1195         d.addCallback(lambda ign:
1196             self._do_upload_with_broken_servers(1))
1197         d.addCallback(_set_basedir)
1198         d.addCallback(lambda ign:
1199             self._setup_and_upload())
1200         d.addCallback(_do_server_setup_2)
1201         d.addCallback(_remove_server)
1202         d.addCallback(lambda ign:
1203             self.shouldFail(UploadUnhappinessError,
1204                             "test_dropped_servers_in_encoder",
1205                             "shares could be placed on only 3 server(s) "
1206                             "such that any 3 of them have enough shares to "
1207                             "recover the file, but we were asked to place "
1208                             "shares on at least 4",
1209                             self._do_upload_with_broken_servers, 2))
1210         return d
1211
1212
1213     def test_merge_peers(self):
1214         # merge_peers merges a list of used_peers and a dict of 
1215         # shareid -> peerid mappings.
1216         shares = {
1217                     1 : set(["server1"]),
1218                     2 : set(["server2"]),
1219                     3 : set(["server3"]),
1220                     4 : set(["server4", "server5"]),
1221                     5 : set(["server1", "server2"]),
1222                  }
1223         # if not provided with a used_peers argument, it should just
1224         # return the first argument unchanged.
1225         self.failUnlessEqual(shares, merge_peers(shares, set([])))
1226         class FakePeerTracker:
1227             pass
1228         trackers = []
1229         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1230             t = FakePeerTracker()
1231             t.peerid = server
1232             t.buckets = [i]
1233             trackers.append(t)
1234         expected = {
1235                     1 : set(["server1"]),
1236                     2 : set(["server2"]),
1237                     3 : set(["server3"]),
1238                     4 : set(["server4", "server5"]),
1239                     5 : set(["server1", "server2", "server5"]),
1240                     6 : set(["server6"]),
1241                     7 : set(["server7"]),
1242                     8 : set(["server8"]),
1243                    }
1244         self.failUnlessEqual(expected, merge_peers(shares, set(trackers)))
1245         shares2 = {}
1246         expected = {
1247                     5 : set(["server5"]),
1248                     6 : set(["server6"]),
1249                     7 : set(["server7"]),
1250                     8 : set(["server8"]),
1251                    }
1252         self.failUnlessEqual(expected, merge_peers(shares2, set(trackers)))
1253         shares3 = {}
1254         trackers = []
1255         expected = {}
1256         for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1257             shares3[i] = set([server])
1258             t = FakePeerTracker()
1259             t.peerid = server
1260             t.buckets = [i]
1261             trackers.append(t)
1262             expected[i] = set([server])
1263         self.failUnlessEqual(expected, merge_peers(shares3, set(trackers)))
1264
1265
1266     def test_servers_of_happiness_utility_function(self):
1267         # These tests are concerned with the servers_of_happiness()
1268         # utility function, and its underlying matching algorithm. Other
1269         # aspects of the servers_of_happiness behavior are tested
1270         # elsehwere These tests exist to ensure that
1271         # servers_of_happiness doesn't under or overcount the happiness
1272         # value for given inputs.
1273
1274         # servers_of_happiness expects a dict of 
1275         # shnum => set(peerids) as a preexisting shares argument.
1276         test1 = {
1277                  1 : set(["server1"]),
1278                  2 : set(["server2"]),
1279                  3 : set(["server3"]),
1280                  4 : set(["server4"])
1281                 }
1282         happy = servers_of_happiness(test1)
1283         self.failUnlessEqual(4, happy)
1284         test1[4] = set(["server1"])
1285         # We've added a duplicate server, so now servers_of_happiness
1286         # should be 3 instead of 4.
1287         happy = servers_of_happiness(test1)
1288         self.failUnlessEqual(3, happy)
1289         # The second argument of merge_peers should be a set of 
1290         # objects with peerid and buckets as attributes. In actual use, 
1291         # these will be PeerTracker instances, but for testing it is fine 
1292         # to make a FakePeerTracker whose job is to hold those instance
1293         # variables to test that part.
1294         class FakePeerTracker:
1295             pass
1296         trackers = []
1297         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1298             t = FakePeerTracker()
1299             t.peerid = server
1300             t.buckets = [i]
1301             trackers.append(t)
1302         # Recall that test1 is a server layout with servers_of_happiness
1303         # = 3.  Since there isn't any overlap between the shnum ->
1304         # set([peerid]) correspondences in test1 and those in trackers,
1305         # the result here should be 7.
1306         test2 = merge_peers(test1, set(trackers))
1307         happy = servers_of_happiness(test2)
1308         self.failUnlessEqual(7, happy)
1309         # Now add an overlapping server to trackers. This is redundant,
1310         # so it should not cause the previously reported happiness value
1311         # to change.
1312         t = FakePeerTracker()
1313         t.peerid = "server1"
1314         t.buckets = [1]
1315         trackers.append(t)
1316         test2 = merge_peers(test1, set(trackers))
1317         happy = servers_of_happiness(test2)
1318         self.failUnlessEqual(7, happy)
1319         test = {}
1320         happy = servers_of_happiness(test)
1321         self.failUnlessEqual(0, happy)
1322         # Test a more substantial overlap between the trackers and the 
1323         # existing assignments.
1324         test = {
1325             1 : set(['server1']),
1326             2 : set(['server2']),
1327             3 : set(['server3']),
1328             4 : set(['server4']),
1329         }
1330         trackers = []
1331         t = FakePeerTracker()
1332         t.peerid = 'server5'
1333         t.buckets = [4]
1334         trackers.append(t)
1335         t = FakePeerTracker()
1336         t.peerid = 'server6'
1337         t.buckets = [3, 5]
1338         trackers.append(t)
1339         # The value returned by servers_of_happiness is the size 
1340         # of a maximum matching in the bipartite graph that
1341         # servers_of_happiness() makes between peerids and share
1342         # numbers. It should find something like this:
1343         # (server 1, share 1)
1344         # (server 2, share 2)
1345         # (server 3, share 3)
1346         # (server 5, share 4)
1347         # (server 6, share 5)
1348         # 
1349         # and, since there are 5 edges in this matching, it should
1350         # return 5.
1351         test2 = merge_peers(test, set(trackers))
1352         happy = servers_of_happiness(test2)
1353         self.failUnlessEqual(5, happy)
1354         # Zooko's first puzzle: 
1355         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1356         #
1357         # server 1: shares 0, 1
1358         # server 2: shares 1, 2
1359         # server 3: share 2
1360         #
1361         # This should yield happiness of 3.
1362         test = {
1363             0 : set(['server1']),
1364             1 : set(['server1', 'server2']),
1365             2 : set(['server2', 'server3']),
1366         }
1367         self.failUnlessEqual(3, servers_of_happiness(test))
1368         # Zooko's second puzzle:        
1369         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1370         # 
1371         # server 1: shares 0, 1
1372         # server 2: share 1
1373         # 
1374         # This should yield happiness of 2.
1375         test = {
1376             0 : set(['server1']),
1377             1 : set(['server1', 'server2']),
1378         }
1379         self.failUnlessEqual(2, servers_of_happiness(test))
1380
1381
1382     def test_shares_by_server(self):
1383         test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1384         sbs = shares_by_server(test)
1385         self.failUnlessEqual(set([1]), sbs["server1"])
1386         self.failUnlessEqual(set([2]), sbs["server2"])
1387         self.failUnlessEqual(set([3]), sbs["server3"])
1388         self.failUnlessEqual(set([4]), sbs["server4"])
1389         test1 = {
1390                     1 : set(["server1"]),
1391                     2 : set(["server1"]),
1392                     3 : set(["server1"]),
1393                     4 : set(["server2"]),
1394                     5 : set(["server2"])
1395                 }
1396         sbs = shares_by_server(test1)
1397         self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1398         self.failUnlessEqual(set([4, 5]), sbs["server2"])
1399         # This should fail unless the peerid part of the mapping is a set
1400         test2 = {1: "server1"}
1401         self.shouldFail(AssertionError,
1402                        "test_shares_by_server",
1403                        "",
1404                        shares_by_server, test2)
1405
1406
1407     def test_existing_share_detection(self):
1408         self.basedir = self.mktemp()
1409         d = self._setup_and_upload()
1410         # Our final setup should look like this:
1411         # server 1: shares 0 - 9, read-only
1412         # server 2: empty
1413         # server 3: empty
1414         # server 4: empty
1415         # The purpose of this test is to make sure that the peer selector
1416         # knows about the shares on server 1, even though it is read-only.
1417         # It used to simply filter these out, which would cause the test
1418         # to fail when servers_of_happiness = 4.
1419         d.addCallback(lambda ign:
1420             self._add_server_with_share(1, 0, True))
1421         d.addCallback(lambda ign:
1422             self._add_server(2))
1423         d.addCallback(lambda ign:
1424             self._add_server(3))
1425         d.addCallback(lambda ign:
1426             self._add_server(4))
1427         def _copy_shares(ign):
1428             for i in xrange(1, 10):
1429                 self._copy_share_to_server(i, 1)
1430         d.addCallback(_copy_shares)
1431         d.addCallback(lambda ign:
1432             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1433         def _prepare_client(ign):
1434             client = self.g.clients[0]
1435             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1436             return client
1437         d.addCallback(_prepare_client)
1438         d.addCallback(lambda client:
1439             client.upload(upload.Data("data" * 10000, convergence="")))
1440         return d
1441
1442
1443     def test_query_counting(self):
1444         # If peer selection fails, Tahoe2PeerSelector prints out a lot
1445         # of helpful diagnostic information, including query stats.
1446         # This test helps make sure that that information is accurate.
1447         self.basedir = self.mktemp()
1448         d = self._setup_and_upload()
1449         def _setup(ign):
1450             for i in xrange(1, 11):
1451                 self._add_server(server_number=i)
1452             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1453             c = self.g.clients[0]
1454             # We set happy to an unsatisfiable value so that we can check the 
1455             # counting in the exception message. The same progress message
1456             # is also used when the upload is successful, but in that case it
1457             # only gets written to a log, so we can't see what it says.
1458             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1459             return c
1460         d.addCallback(_setup)
1461         d.addCallback(lambda c:
1462             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1463                             "10 queries placed some shares",
1464                             c.upload, upload.Data("data" * 10000,
1465                                                   convergence="")))
1466         # Now try with some readonly servers. We want to make sure that
1467         # the readonly peer share discovery phase is counted correctly.
1468         def _reset(ign):
1469             self.basedir = self.mktemp()
1470             self.g = None
1471         d.addCallback(_reset)
1472         d.addCallback(lambda ign:
1473             self._setup_and_upload())
1474         def _then(ign):
1475             for i in xrange(1, 11):
1476                 self._add_server(server_number=i)
1477             self._add_server(server_number=11, readonly=True)
1478             self._add_server(server_number=12, readonly=True)
1479             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1480             c = self.g.clients[0]
1481             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1482             return c
1483         d.addCallback(_then)
1484         d.addCallback(lambda c:
1485             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1486                             "2 placed none (of which 2 placed none due to "
1487                             "the server being full",
1488                             c.upload, upload.Data("data" * 10000,
1489                                                   convergence="")))
1490         # Now try the case where the upload process finds a bunch of the
1491         # shares that it wants to place on the first server, including
1492         # the one that it wanted to allocate there. Though no shares will
1493         # be allocated in this request, it should still be called
1494         # productive, since it caused some homeless shares to be 
1495         # removed.
1496         d.addCallback(_reset)
1497         d.addCallback(lambda ign:
1498             self._setup_and_upload())
1499
1500         def _next(ign):
1501             for i in xrange(1, 11):
1502                 self._add_server(server_number=i)
1503             # Copy all of the shares to server 9, since that will be
1504             # the first one that the selector sees.
1505             for i in xrange(10):
1506                 self._copy_share_to_server(i, 9)
1507             # Remove server 0, and its contents
1508             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1509             # Make happiness unsatisfiable
1510             c = self.g.clients[0]
1511             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1512             return c
1513         d.addCallback(_next)
1514         d.addCallback(lambda c:
1515             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1516                             "1 queries placed some shares",
1517                             c.upload, upload.Data("data" * 10000,
1518                                                   convergence="")))
1519         return d
1520
1521
1522     def test_upper_limit_on_readonly_queries(self):
1523         self.basedir = self.mktemp()
1524         d = self._setup_and_upload()
1525         def _then(ign):
1526             for i in xrange(1, 11):
1527                 self._add_server(server_number=i, readonly=True)
1528             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1529             c = self.g.clients[0]
1530             c.DEFAULT_ENCODING_PARAMETERS['k'] = 2
1531             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1532             c.DEFAULT_ENCODING_PARAMETERS['n'] = 4
1533             return c
1534         d.addCallback(_then)
1535         d.addCallback(lambda client:
1536             self.shouldFail(UploadUnhappinessError,
1537                             "test_upper_limit_on_readonly_queries",
1538                             "sent 8 queries to 8 peers",
1539                             client.upload,
1540                             upload.Data('data' * 10000, convergence="")))
1541         return d
1542
1543
1544     def test_exception_messages_during_peer_selection(self):
1545         # server 1: read-only, no shares
1546         # server 2: read-only, no shares
1547         # server 3: read-only, no shares
1548         # server 4: read-only, no shares
1549         # server 5: read-only, no shares
1550         # This will fail, but we want to make sure that the log messages
1551         # are informative about why it has failed.
1552         self.basedir = self.mktemp()
1553         d = self._setup_and_upload()
1554         d.addCallback(lambda ign:
1555             self._add_server(server_number=1, readonly=True))
1556         d.addCallback(lambda ign:
1557             self._add_server(server_number=2, readonly=True))
1558         d.addCallback(lambda ign:
1559             self._add_server(server_number=3, readonly=True))
1560         d.addCallback(lambda ign:
1561             self._add_server(server_number=4, readonly=True))
1562         d.addCallback(lambda ign:
1563             self._add_server(server_number=5, readonly=True))
1564         d.addCallback(lambda ign:
1565             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1566         def _reset_encoding_parameters(ign):
1567             client = self.g.clients[0]
1568             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1569             return client
1570         d.addCallback(_reset_encoding_parameters)
1571         d.addCallback(lambda client:
1572             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1573                             "placed 0 shares out of 10 "
1574                             "total (10 homeless), want to place shares on at "
1575                             "least 4 servers such that any 3 of them have "
1576                             "enough shares to recover the file, "
1577                             "sent 5 queries to 5 peers, 0 queries placed "
1578                             "some shares, 5 placed none "
1579                             "(of which 5 placed none due to the server being "
1580                             "full and 0 placed none due to an error)",
1581                             client.upload,
1582                             upload.Data("data" * 10000, convergence="")))
1583
1584
1585         # server 1: read-only, no shares
1586         # server 2: broken, no shares
1587         # server 3: read-only, no shares
1588         # server 4: read-only, no shares
1589         # server 5: read-only, no shares
1590         def _reset(ign):
1591             self.basedir = self.mktemp()
1592         d.addCallback(_reset)
1593         d.addCallback(lambda ign:
1594             self._setup_and_upload())
1595         d.addCallback(lambda ign:
1596             self._add_server(server_number=1, readonly=True))
1597         d.addCallback(lambda ign:
1598             self._add_server(server_number=2))
1599         def _break_server_2(ign):
1600             server = self.g.servers_by_number[2].my_nodeid
1601             # We have to break the server in servers_by_id, 
1602             # because the one in servers_by_number isn't wrapped,
1603             # and doesn't look at its broken attribute when answering
1604             # queries.
1605             self.g.servers_by_id[server].broken = True
1606         d.addCallback(_break_server_2)
1607         d.addCallback(lambda ign:
1608             self._add_server(server_number=3, readonly=True))
1609         d.addCallback(lambda ign:
1610             self._add_server(server_number=4, readonly=True))
1611         d.addCallback(lambda ign:
1612             self._add_server(server_number=5, readonly=True))
1613         d.addCallback(lambda ign:
1614             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1615         def _reset_encoding_parameters(ign, happy=4):
1616             client = self.g.clients[0]
1617             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1618             return client
1619         d.addCallback(_reset_encoding_parameters)
1620         d.addCallback(lambda client:
1621             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1622                             "placed 0 shares out of 10 "
1623                             "total (10 homeless), want to place shares on at "
1624                             "least 4 servers such that any 3 of them have "
1625                             "enough shares to recover the file, "
1626                             "sent 5 queries to 5 peers, 0 queries placed "
1627                             "some shares, 5 placed none "
1628                             "(of which 4 placed none due to the server being "
1629                             "full and 1 placed none due to an error)",
1630                             client.upload,
1631                             upload.Data("data" * 10000, convergence="")))
1632         # server 0, server 1 = empty, accepting shares
1633         # This should place all of the shares, but still fail with happy=4.
1634         # We want to make sure that the exception message is worded correctly.
1635         d.addCallback(_reset)
1636         d.addCallback(lambda ign:
1637             self._setup_grid())
1638         d.addCallback(lambda ign:
1639             self._add_server(server_number=1))
1640         d.addCallback(_reset_encoding_parameters)
1641         d.addCallback(lambda client:
1642             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1643                             "shares could be placed or found on only 2 "
1644                             "server(s). We were asked to place shares on at "
1645                             "least 4 server(s) such that any 3 of them have "
1646                             "enough shares to recover the file.",
1647                             client.upload, upload.Data("data" * 10000,
1648                                                        convergence="")))
1649         # servers 0 - 4 = empty, accepting shares
1650         # This too should place all the shares, and this too should fail,
1651         # but since the effective happiness is more than the k encoding
1652         # parameter, it should trigger a different error message than the one
1653         # above.
1654         d.addCallback(_reset)
1655         d.addCallback(lambda ign:
1656             self._setup_grid())
1657         d.addCallback(lambda ign:
1658             self._add_server(server_number=1))
1659         d.addCallback(lambda ign:
1660             self._add_server(server_number=2))
1661         d.addCallback(lambda ign:
1662             self._add_server(server_number=3))
1663         d.addCallback(lambda ign:
1664             self._add_server(server_number=4))
1665         d.addCallback(_reset_encoding_parameters, happy=7)
1666         d.addCallback(lambda client:
1667             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1668                             "shares could be placed on only 5 server(s) such "
1669                             "that any 3 of them have enough shares to recover "
1670                             "the file, but we were asked to place shares on "
1671                             "at least 7 such servers.",
1672                             client.upload, upload.Data("data" * 10000,
1673                                                        convergence="")))
1674         # server 0: shares 0 - 9
1675         # server 1: share 0, read-only
1676         # server 2: share 0, read-only
1677         # server 3: share 0, read-only
1678         # This should place all of the shares, but fail with happy=4.
1679         # Since the number of servers with shares is more than the number
1680         # necessary to reconstitute the file, this will trigger a different
1681         # error message than either of those above. 
1682         d.addCallback(_reset)
1683         d.addCallback(lambda ign:
1684             self._setup_and_upload())
1685         d.addCallback(lambda ign:
1686             self._add_server_with_share(server_number=1, share_number=0,
1687                                         readonly=True))
1688         d.addCallback(lambda ign:
1689             self._add_server_with_share(server_number=2, share_number=0,
1690                                         readonly=True))
1691         d.addCallback(lambda ign:
1692             self._add_server_with_share(server_number=3, share_number=0,
1693                                         readonly=True))
1694         d.addCallback(_reset_encoding_parameters, happy=7)
1695         d.addCallback(lambda client:
1696             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1697                             "shares could be placed or found on 4 server(s), "
1698                             "but they are not spread out evenly enough to "
1699                             "ensure that any 3 of these servers would have "
1700                             "enough shares to recover the file. We were asked "
1701                             "to place shares on at least 7 servers such that "
1702                             "any 3 of them have enough shares to recover the "
1703                             "file",
1704                             client.upload, upload.Data("data" * 10000,
1705                                                        convergence="")))
1706         return d
1707
1708
1709     def test_problem_layout_comment_187(self):
1710         # #778 comment 187 broke an initial attempt at a share
1711         # redistribution algorithm. This test is here to demonstrate the
1712         # breakage, and to test that subsequent algorithms don't also
1713         # break in the same way.
1714         self.basedir = self.mktemp()
1715         d = self._setup_and_upload(k=2, n=3)
1716
1717         # server 1: shares 0, 1, 2, readonly
1718         # server 2: share 0, readonly
1719         # server 3: share 0
1720         def _setup(ign):
1721             self._add_server_with_share(server_number=1, share_number=0,
1722                                         readonly=True)
1723             self._add_server_with_share(server_number=2, share_number=0,
1724                                         readonly=True)
1725             self._add_server_with_share(server_number=3, share_number=0)
1726             # Copy shares
1727             self._copy_share_to_server(1, 1)
1728             self._copy_share_to_server(2, 1)
1729             # Remove server 0
1730             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1731             client = self.g.clients[0]
1732             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
1733             return client
1734
1735         d.addCallback(_setup)
1736         d.addCallback(lambda client:
1737             client.upload(upload.Data("data" * 10000, convergence="")))
1738         return d
1739     test_problem_layout_comment_187.todo = "this isn't fixed yet"
1740
1741
1742     def test_upload_succeeds_with_some_homeless_shares(self):
1743         # If the upload is forced to stop trying to place shares before
1744         # it has placed (or otherwise accounted) for all of them, but it
1745         # has placed enough to satisfy the upload health criteria that
1746         # we're using, it should still succeed.
1747         self.basedir = self.mktemp()
1748         d = self._setup_and_upload()
1749         def _server_setup(ign):
1750             # Add four servers so that we have a layout like this:
1751             # server 1: share 0, read-only
1752             # server 2: share 1, read-only
1753             # server 3: share 2, read-only
1754             # server 4: share 3, read-only
1755             # If we set happy = 4, the upload will manage to satisfy
1756             # servers of happiness, but not place all of the shares; we
1757             # want to test that the upload is declared successful in
1758             # this case.
1759             self._add_server_with_share(server_number=1, share_number=0,
1760                                         readonly=True)
1761             self._add_server_with_share(server_number=2, share_number=1,
1762                                         readonly=True)
1763             self._add_server_with_share(server_number=3, share_number=2,
1764                                         readonly=True)
1765             self._add_server_with_share(server_number=4, share_number=3,
1766                                         readonly=True)
1767             # Remove server 0.
1768             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1769             # Set the client appropriately
1770             c = self.g.clients[0]
1771             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1772             return c
1773         d.addCallback(_server_setup)
1774         d.addCallback(lambda client:
1775             client.upload(upload.Data("data" * 10000, convergence="")))
1776         return d
1777
1778
1779     def test_uploader_skips_over_servers_with_only_one_share(self):
1780         # We want to make sure that the redistribution logic ignores
1781         # servers with only one share, since placing these shares
1782         # elsewhere will at best keep happiness the same as it was, and
1783         # at worst hurt it.
1784         self.basedir = self.mktemp()
1785         d = self._setup_and_upload()
1786         def _server_setup(ign):
1787             # Add some servers so that the upload will need to
1788             # redistribute, but will first pass over a couple of servers
1789             # that don't have enough shares to redistribute before
1790             # finding one that does have shares to redistribute. 
1791             self._add_server_with_share(server_number=1, share_number=0)
1792             self._add_server_with_share(server_number=2, share_number=2)
1793             self._add_server_with_share(server_number=3, share_number=1)
1794             self._add_server_with_share(server_number=8, share_number=4)
1795             self._add_server_with_share(server_number=5, share_number=5)
1796             self._add_server_with_share(server_number=10, share_number=7)
1797             for i in xrange(4):
1798                 self._copy_share_to_server(i, 2)
1799             return self.g.clients[0]
1800         d.addCallback(_server_setup)
1801         d.addCallback(lambda client:
1802             client.upload(upload.Data("data" * 10000, convergence="")))
1803         return d
1804
1805
1806     def _set_up_nodes_extra_config(self, clientdir):
1807         cfgfn = os.path.join(clientdir, "tahoe.cfg")
1808         oldcfg = open(cfgfn, "r").read()
1809         f = open(cfgfn, "wt")
1810         f.write(oldcfg)
1811         f.write("\n")
1812         f.write("[client]\n")
1813         f.write("shares.needed = 7\n")
1814         f.write("shares.total = 12\n")
1815         f.write("\n")
1816         f.close()
1817         return None
1818
1819 # TODO:
1820 #  upload with exactly 75 peers (shares_of_happiness)
1821 #  have a download fail
1822 #  cancel a download (need to implement more cancel stuff)