]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
Change relative imports to absolute
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1
2 import os, shutil
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.python.failure import Failure
6 from twisted.python import log
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
9
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
14 from allmydata.util.assertutil import precondition
15 from allmydata.util.deferredutil import DeferredListShouldSucceed
16 from no_network import GridTestMixin
17 from common_util import ShouldFailMixin
18 from allmydata.storage_client import StorageFarmBroker
19 from allmydata.storage.server import storage_index_to_dir
20
21 MiB = 1024*1024
22
23 def extract_uri(results):
24     return results.uri
25
26 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
27 # have been due to an earlier test ERROR'ing out due to timeout, which seems
28 # to screw up subsequent tests.
29 timeout = 960
30
31 class Uploadable(unittest.TestCase):
32     def shouldEqual(self, data, expected):
33         self.failUnless(isinstance(data, list))
34         for e in data:
35             self.failUnless(isinstance(e, str))
36         s = "".join(data)
37         self.failUnlessEqual(s, expected)
38
39     def test_filehandle_random_key(self):
40         return self._test_filehandle(convergence=None)
41
42     def test_filehandle_convergent_encryption(self):
43         return self._test_filehandle(convergence="some convergence string")
44
45     def _test_filehandle(self, convergence):
46         s = StringIO("a"*41)
47         u = upload.FileHandle(s, convergence=convergence)
48         d = u.get_size()
49         d.addCallback(self.failUnlessEqual, 41)
50         d.addCallback(lambda res: u.read(1))
51         d.addCallback(self.shouldEqual, "a")
52         d.addCallback(lambda res: u.read(80))
53         d.addCallback(self.shouldEqual, "a"*40)
54         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
55         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
56         return d
57
58     def test_filename(self):
59         basedir = "upload/Uploadable/test_filename"
60         os.makedirs(basedir)
61         fn = os.path.join(basedir, "file")
62         f = open(fn, "w")
63         f.write("a"*41)
64         f.close()
65         u = upload.FileName(fn, convergence=None)
66         d = u.get_size()
67         d.addCallback(self.failUnlessEqual, 41)
68         d.addCallback(lambda res: u.read(1))
69         d.addCallback(self.shouldEqual, "a")
70         d.addCallback(lambda res: u.read(80))
71         d.addCallback(self.shouldEqual, "a"*40)
72         d.addCallback(lambda res: u.close())
73         return d
74
75     def test_data(self):
76         s = "a"*41
77         u = upload.Data(s, convergence=None)
78         d = u.get_size()
79         d.addCallback(self.failUnlessEqual, 41)
80         d.addCallback(lambda res: u.read(1))
81         d.addCallback(self.shouldEqual, "a")
82         d.addCallback(lambda res: u.read(80))
83         d.addCallback(self.shouldEqual, "a"*40)
84         d.addCallback(lambda res: u.close())
85         return d
86
87 class ServerError(Exception):
88     pass
89
90 class SetDEPMixin:
91     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
92         p = {"k": k,
93              "happy": happy,
94              "n": n,
95              "max_segment_size": max_segsize,
96              }
97         self.node.DEFAULT_ENCODING_PARAMETERS = p
98
99 class FakeStorageServer:
100     def __init__(self, mode):
101         self.mode = mode
102         self.allocated = []
103         self.queries = 0
104         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
105                          { "maximum-immutable-share-size": 2**32 },
106                          "application-version": str(allmydata.__full_version__),
107                          }
108         if mode == "small":
109             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
110                              { "maximum-immutable-share-size": 10 },
111                              "application-version": str(allmydata.__full_version__),
112                              }
113
114
115     def callRemote(self, methname, *args, **kwargs):
116         def _call():
117             meth = getattr(self, methname)
118             return meth(*args, **kwargs)
119         d = fireEventually()
120         d.addCallback(lambda res: _call())
121         return d
122
123     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
124                          sharenums, share_size, canary):
125         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
126         if self.mode == "first-fail":
127             if self.queries == 0:
128                 raise ServerError
129         if self.mode == "second-fail":
130             if self.queries == 1:
131                 raise ServerError
132         self.queries += 1
133         if self.mode == "full":
134             return (set(), {},)
135         elif self.mode == "already got them":
136             return (set(sharenums), {},)
137         else:
138             for shnum in sharenums:
139                 self.allocated.append( (storage_index, shnum) )
140             return (set(),
141                     dict([( shnum, FakeBucketWriter(share_size) )
142                           for shnum in sharenums]),
143                     )
144
145 class FakeBucketWriter:
146     # a diagnostic version of storageserver.BucketWriter
147     def __init__(self, size):
148         self.data = StringIO()
149         self.closed = False
150         self._size = size
151
152     def callRemote(self, methname, *args, **kwargs):
153         def _call():
154             meth = getattr(self, "remote_" + methname)
155             return meth(*args, **kwargs)
156         d = fireEventually()
157         d.addCallback(lambda res: _call())
158         return d
159
160     def remote_write(self, offset, data):
161         precondition(not self.closed)
162         precondition(offset >= 0)
163         precondition(offset+len(data) <= self._size,
164                      "offset=%d + data=%d > size=%d" %
165                      (offset, len(data), self._size))
166         self.data.seek(offset)
167         self.data.write(data)
168
169     def remote_close(self):
170         precondition(not self.closed)
171         self.closed = True
172
173     def remote_abort(self):
174         log.err(RuntimeError("uh oh, I was asked to abort"))
175
176 class FakeClient:
177     DEFAULT_ENCODING_PARAMETERS = {"k":25,
178                                    "happy": 25,
179                                    "n": 100,
180                                    "max_segment_size": 1*MiB,
181                                    }
182     def __init__(self, mode="good", num_servers=50):
183         self.num_servers = num_servers
184         if type(mode) is str:
185             mode = dict([i,mode] for i in range(num_servers))
186         peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
187                   for fakeid in range(self.num_servers) ]
188         self.storage_broker = StorageFarmBroker(None, permute_peers=True)
189         for (serverid, server) in peers:
190             self.storage_broker.test_add_server(serverid, server)
191         self.last_peers = [p[1] for p in peers]
192
193     def log(self, *args, **kwargs):
194         pass
195     def get_encoding_parameters(self):
196         return self.DEFAULT_ENCODING_PARAMETERS
197     def get_storage_broker(self):
198         return self.storage_broker
199     _secret_holder = client.SecretHolder("lease secret", "convergence secret")
200
201 class GotTooFarError(Exception):
202     pass
203
204 class GiganticUploadable(upload.FileHandle):
205     def __init__(self, size):
206         self._size = size
207         self._fp = 0
208
209     def get_encryption_key(self):
210         return defer.succeed("\x00" * 16)
211     def get_size(self):
212         return defer.succeed(self._size)
213     def read(self, length):
214         left = self._size - self._fp
215         length = min(left, length)
216         self._fp += length
217         if self._fp > 1000000:
218             # terminate the test early.
219             raise GotTooFarError("we shouldn't be allowed to get this far")
220         return defer.succeed(["\x00" * length])
221     def close(self):
222         pass
223
224 DATA = """
225 Once upon a time, there was a beautiful princess named Buttercup. She lived
226 in a magical land where every file was stored securely among millions of
227 machines, and nobody ever worried about their data being lost ever again.
228 The End.
229 """
230 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
231
232 SIZE_ZERO = 0
233 SIZE_SMALL = 16
234 SIZE_LARGE = len(DATA)
235
236 def upload_data(uploader, data):
237     u = upload.Data(data, convergence=None)
238     return uploader.upload(u)
239 def upload_filename(uploader, filename):
240     u = upload.FileName(filename, convergence=None)
241     return uploader.upload(u)
242 def upload_filehandle(uploader, fh):
243     u = upload.FileHandle(fh, convergence=None)
244     return uploader.upload(u)
245
246 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
247     def setUp(self):
248         self.node = FakeClient(mode="good")
249         self.u = upload.Uploader()
250         self.u.running = True
251         self.u.parent = self.node
252
253     def _check_small(self, newuri, size):
254         u = uri.from_string(newuri)
255         self.failUnless(isinstance(u, uri.LiteralFileURI))
256         self.failUnlessEqual(len(u.data), size)
257
258     def _check_large(self, newuri, size):
259         u = uri.from_string(newuri)
260         self.failUnless(isinstance(u, uri.CHKFileURI))
261         self.failUnless(isinstance(u.get_storage_index(), str))
262         self.failUnlessEqual(len(u.get_storage_index()), 16)
263         self.failUnless(isinstance(u.key, str))
264         self.failUnlessEqual(len(u.key), 16)
265         self.failUnlessEqual(u.size, size)
266
267     def get_data(self, size):
268         return DATA[:size]
269
270     def test_too_large(self):
271         # we've removed the 4GiB share size limit (see ticket #346 for
272         # details), but still have an 8-byte field, so the limit is now
273         # 2**64, so make sure we reject files larger than that.
274         k = 3; happy = 7; n = 10
275         self.set_encoding_parameters(k, happy, n)
276         big = k*(2**64)
277         data1 = GiganticUploadable(big)
278         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
279                             "This file is too large to be uploaded (data_size)",
280                             self.u.upload, data1)
281         data2 = GiganticUploadable(big-3)
282         d.addCallback(lambda res:
283                       self.shouldFail(FileTooLargeError,
284                                       "test_too_large-data2",
285                                       "This file is too large to be uploaded (offsets)",
286                                       self.u.upload, data2))
287         # I don't know where the actual limit is.. it depends upon how large
288         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
289         return d
290
291     def test_data_zero(self):
292         data = self.get_data(SIZE_ZERO)
293         d = upload_data(self.u, data)
294         d.addCallback(extract_uri)
295         d.addCallback(self._check_small, SIZE_ZERO)
296         return d
297
298     def test_data_small(self):
299         data = self.get_data(SIZE_SMALL)
300         d = upload_data(self.u, data)
301         d.addCallback(extract_uri)
302         d.addCallback(self._check_small, SIZE_SMALL)
303         return d
304
305     def test_data_large(self):
306         data = self.get_data(SIZE_LARGE)
307         d = upload_data(self.u, data)
308         d.addCallback(extract_uri)
309         d.addCallback(self._check_large, SIZE_LARGE)
310         return d
311
312     def test_data_large_odd_segments(self):
313         data = self.get_data(SIZE_LARGE)
314         segsize = int(SIZE_LARGE / 2.5)
315         # we want 3 segments, since that's not a power of two
316         self.set_encoding_parameters(25, 25, 100, segsize)
317         d = upload_data(self.u, data)
318         d.addCallback(extract_uri)
319         d.addCallback(self._check_large, SIZE_LARGE)
320         return d
321
322     def test_filehandle_zero(self):
323         data = self.get_data(SIZE_ZERO)
324         d = upload_filehandle(self.u, StringIO(data))
325         d.addCallback(extract_uri)
326         d.addCallback(self._check_small, SIZE_ZERO)
327         return d
328
329     def test_filehandle_small(self):
330         data = self.get_data(SIZE_SMALL)
331         d = upload_filehandle(self.u, StringIO(data))
332         d.addCallback(extract_uri)
333         d.addCallback(self._check_small, SIZE_SMALL)
334         return d
335
336     def test_filehandle_large(self):
337         data = self.get_data(SIZE_LARGE)
338         d = upload_filehandle(self.u, StringIO(data))
339         d.addCallback(extract_uri)
340         d.addCallback(self._check_large, SIZE_LARGE)
341         return d
342
343     def test_filename_zero(self):
344         fn = "Uploader-test_filename_zero.data"
345         f = open(fn, "wb")
346         data = self.get_data(SIZE_ZERO)
347         f.write(data)
348         f.close()
349         d = upload_filename(self.u, fn)
350         d.addCallback(extract_uri)
351         d.addCallback(self._check_small, SIZE_ZERO)
352         return d
353
354     def test_filename_small(self):
355         fn = "Uploader-test_filename_small.data"
356         f = open(fn, "wb")
357         data = self.get_data(SIZE_SMALL)
358         f.write(data)
359         f.close()
360         d = upload_filename(self.u, fn)
361         d.addCallback(extract_uri)
362         d.addCallback(self._check_small, SIZE_SMALL)
363         return d
364
365     def test_filename_large(self):
366         fn = "Uploader-test_filename_large.data"
367         f = open(fn, "wb")
368         data = self.get_data(SIZE_LARGE)
369         f.write(data)
370         f.close()
371         d = upload_filename(self.u, fn)
372         d.addCallback(extract_uri)
373         d.addCallback(self._check_large, SIZE_LARGE)
374         return d
375
376 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
377     def make_node(self, mode, num_servers=10):
378         self.node = FakeClient(mode, num_servers)
379         self.u = upload.Uploader()
380         self.u.running = True
381         self.u.parent = self.node
382
383     def _check_large(self, newuri, size):
384         u = uri.from_string(newuri)
385         self.failUnless(isinstance(u, uri.CHKFileURI))
386         self.failUnless(isinstance(u.get_storage_index(), str))
387         self.failUnlessEqual(len(u.get_storage_index()), 16)
388         self.failUnless(isinstance(u.key, str))
389         self.failUnlessEqual(len(u.key), 16)
390         self.failUnlessEqual(u.size, size)
391
392     def test_first_error(self):
393         mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
394         self.make_node(mode)
395         self.set_encoding_parameters(k=25, happy=1, n=50)
396         d = upload_data(self.u, DATA)
397         d.addCallback(extract_uri)
398         d.addCallback(self._check_large, SIZE_LARGE)
399         return d
400
401     def test_first_error_all(self):
402         self.make_node("first-fail")
403         d = self.shouldFail(UploadUnhappinessError, "first_error_all",
404                             "peer selection failed",
405                             upload_data, self.u, DATA)
406         def _check((f,)):
407             self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
408             # there should also be a 'last failure was' message
409             self.failUnlessIn("ServerError", str(f.value))
410         d.addCallback(_check)
411         return d
412
413     def test_second_error(self):
414         # we want to make sure we make it to a third pass. This means that
415         # the first pass was insufficient to place all shares, and at least
416         # one of second pass servers (other than the last one) accepted a
417         # share (so we'll believe that a third pass will be useful). (if
418         # everyone but the last server throws an error, then we'll send all
419         # the remaining shares to the last server at the end of the second
420         # pass, and if that succeeds, we won't make it to a third pass).
421         #
422         # we can achieve this 97.5% of the time by using 40 servers, having
423         # 39 of them fail on the second request, leaving only one to succeed
424         # on the second request. (we need to keep the number of servers low
425         # enough to ensure a second pass with 100 shares).
426         mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
427         self.make_node(mode, 40)
428         d = upload_data(self.u, DATA)
429         d.addCallback(extract_uri)
430         d.addCallback(self._check_large, SIZE_LARGE)
431         return d
432
433     def test_second_error_all(self):
434         self.make_node("second-fail")
435         d = self.shouldFail(UploadUnhappinessError, "second_error_all",
436                             "peer selection failed",
437                             upload_data, self.u, DATA)
438         def _check((f,)):
439             self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
440             # there should also be a 'last failure was' message
441             self.failUnlessIn("ServerError", str(f.value))
442         d.addCallback(_check)
443         return d
444
445 class FullServer(unittest.TestCase):
446     def setUp(self):
447         self.node = FakeClient(mode="full")
448         self.u = upload.Uploader()
449         self.u.running = True
450         self.u.parent = self.node
451
452     def _should_fail(self, f):
453         self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
454
455     def test_data_large(self):
456         data = DATA
457         d = upload_data(self.u, data)
458         d.addBoth(self._should_fail)
459         return d
460
461 class PeerSelection(unittest.TestCase):
462
463     def make_client(self, num_servers=50):
464         self.node = FakeClient(mode="good", num_servers=num_servers)
465         self.u = upload.Uploader()
466         self.u.running = True
467         self.u.parent = self.node
468
469     def get_data(self, size):
470         return DATA[:size]
471
472     def _check_large(self, newuri, size):
473         u = uri.from_string(newuri)
474         self.failUnless(isinstance(u, uri.CHKFileURI))
475         self.failUnless(isinstance(u.get_storage_index(), str))
476         self.failUnlessEqual(len(u.get_storage_index()), 16)
477         self.failUnless(isinstance(u.key, str))
478         self.failUnlessEqual(len(u.key), 16)
479         self.failUnlessEqual(u.size, size)
480
481     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
482         p = {"k": k,
483              "happy": happy,
484              "n": n,
485              "max_segment_size": max_segsize,
486              }
487         self.node.DEFAULT_ENCODING_PARAMETERS = p
488
489     def test_one_each(self):
490         # if we have 50 shares, and there are 50 peers, and they all accept a
491         # share, we should get exactly one share per peer
492
493         self.make_client()
494         data = self.get_data(SIZE_LARGE)
495         self.set_encoding_parameters(25, 30, 50)
496         d = upload_data(self.u, data)
497         d.addCallback(extract_uri)
498         d.addCallback(self._check_large, SIZE_LARGE)
499         def _check(res):
500             for p in self.node.last_peers:
501                 allocated = p.allocated
502                 self.failUnlessEqual(len(allocated), 1)
503                 self.failUnlessEqual(p.queries, 1)
504         d.addCallback(_check)
505         return d
506
507     def test_two_each(self):
508         # if we have 100 shares, and there are 50 peers, and they all accept
509         # all shares, we should get exactly two shares per peer
510
511         self.make_client()
512         data = self.get_data(SIZE_LARGE)
513         # if there are 50 peers, then happy needs to be <= 50
514         self.set_encoding_parameters(50, 50, 100)
515         d = upload_data(self.u, data)
516         d.addCallback(extract_uri)
517         d.addCallback(self._check_large, SIZE_LARGE)
518         def _check(res):
519             for p in self.node.last_peers:
520                 allocated = p.allocated
521                 self.failUnlessEqual(len(allocated), 2)
522                 self.failUnlessEqual(p.queries, 2)
523         d.addCallback(_check)
524         return d
525
526     def test_one_each_plus_one_extra(self):
527         # if we have 51 shares, and there are 50 peers, then one peer gets
528         # two shares and the rest get just one
529
530         self.make_client()
531         data = self.get_data(SIZE_LARGE)
532         self.set_encoding_parameters(24, 41, 51)
533         d = upload_data(self.u, data)
534         d.addCallback(extract_uri)
535         d.addCallback(self._check_large, SIZE_LARGE)
536         def _check(res):
537             got_one = []
538             got_two = []
539             for p in self.node.last_peers:
540                 allocated = p.allocated
541                 self.failUnless(len(allocated) in (1,2), len(allocated))
542                 if len(allocated) == 1:
543                     self.failUnlessEqual(p.queries, 1)
544                     got_one.append(p)
545                 else:
546                     self.failUnlessEqual(p.queries, 2)
547                     got_two.append(p)
548             self.failUnlessEqual(len(got_one), 49)
549             self.failUnlessEqual(len(got_two), 1)
550         d.addCallback(_check)
551         return d
552
553     def test_four_each(self):
554         # if we have 200 shares, and there are 50 peers, then each peer gets
555         # 4 shares. The design goal is to accomplish this with only two
556         # queries per peer.
557
558         self.make_client()
559         data = self.get_data(SIZE_LARGE)
560         # if there are 50 peers, then happy should be no more than 50 if
561         # we want this to work.
562         self.set_encoding_parameters(100, 50, 200)
563         d = upload_data(self.u, data)
564         d.addCallback(extract_uri)
565         d.addCallback(self._check_large, SIZE_LARGE)
566         def _check(res):
567             for p in self.node.last_peers:
568                 allocated = p.allocated
569                 self.failUnlessEqual(len(allocated), 4)
570                 self.failUnlessEqual(p.queries, 2)
571         d.addCallback(_check)
572         return d
573
574     def test_three_of_ten(self):
575         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
576         # 4+4+2
577
578         self.make_client(3)
579         data = self.get_data(SIZE_LARGE)
580         self.set_encoding_parameters(3, 3, 10)
581         d = upload_data(self.u, data)
582         d.addCallback(extract_uri)
583         d.addCallback(self._check_large, SIZE_LARGE)
584         def _check(res):
585             counts = {}
586             for p in self.node.last_peers:
587                 allocated = p.allocated
588                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
589             histogram = [counts.get(i, 0) for i in range(5)]
590             self.failUnlessEqual(histogram, [0,0,0,2,1])
591         d.addCallback(_check)
592         return d
593
594     def test_some_big_some_small(self):
595         # 10 shares, 20 servers, but half the servers don't support a
596         # share-size large enough for our file
597         mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
598         self.node = FakeClient(mode, num_servers=20)
599         self.u = upload.Uploader()
600         self.u.running = True
601         self.u.parent = self.node
602
603         data = self.get_data(SIZE_LARGE)
604         self.set_encoding_parameters(3, 5, 10)
605         d = upload_data(self.u, data)
606         d.addCallback(extract_uri)
607         d.addCallback(self._check_large, SIZE_LARGE)
608         def _check(res):
609             # we should have put one share each on the big peers, and zero
610             # shares on the small peers
611             total_allocated = 0
612             for p in self.node.last_peers:
613                 if p.mode == "good":
614                     self.failUnlessEqual(len(p.allocated), 1)
615                 elif p.mode == "small":
616                     self.failUnlessEqual(len(p.allocated), 0)
617                 total_allocated += len(p.allocated)
618             self.failUnlessEqual(total_allocated, 10)
619         d.addCallback(_check)
620         return d
621
622
623 class StorageIndex(unittest.TestCase):
624     def test_params_must_matter(self):
625         DATA = "I am some data"
626         u = upload.Data(DATA, convergence="")
627         eu = upload.EncryptAnUploadable(u)
628         d1 = eu.get_storage_index()
629
630         # CHK means the same data should encrypt the same way
631         u = upload.Data(DATA, convergence="")
632         eu = upload.EncryptAnUploadable(u)
633         d1a = eu.get_storage_index()
634
635         # but if we use a different convergence string it should be different
636         u = upload.Data(DATA, convergence="wheee!")
637         eu = upload.EncryptAnUploadable(u)
638         d1salt1 = eu.get_storage_index()
639
640         # and if we add yet a different convergence it should be different again
641         u = upload.Data(DATA, convergence="NOT wheee!")
642         eu = upload.EncryptAnUploadable(u)
643         d1salt2 = eu.get_storage_index()
644
645         # and if we use the first string again it should be the same as last time
646         u = upload.Data(DATA, convergence="wheee!")
647         eu = upload.EncryptAnUploadable(u)
648         d1salt1a = eu.get_storage_index()
649
650         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
651         u = upload.Data(DATA, convergence="")
652         u.encoding_param_k = u.default_encoding_param_k + 1
653         eu = upload.EncryptAnUploadable(u)
654         d2 = eu.get_storage_index()
655
656         # and if we use a random key, it should be different than the CHK
657         u = upload.Data(DATA, convergence=None)
658         eu = upload.EncryptAnUploadable(u)
659         d3 = eu.get_storage_index()
660         # and different from another instance
661         u = upload.Data(DATA, convergence=None)
662         eu = upload.EncryptAnUploadable(u)
663         d4 = eu.get_storage_index()
664
665         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
666         def _done(res):
667             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
668             self.failUnlessEqual(si1, si1a)
669             self.failIfEqual(si1, si2)
670             self.failIfEqual(si1, si3)
671             self.failIfEqual(si1, si4)
672             self.failIfEqual(si3, si4)
673             self.failIfEqual(si1salt1, si1)
674             self.failIfEqual(si1salt1, si1salt2)
675             self.failIfEqual(si1salt2, si1)
676             self.failUnlessEqual(si1salt1, si1salt1a)
677         d.addCallback(_done)
678         return d
679
680 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
681     ShouldFailMixin):
682     def _do_upload_with_broken_servers(self, servers_to_break):
683         """
684         I act like a normal upload, but before I send the results of
685         Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
686         PeerTrackers in the used_peers part of the return result.
687         """
688         assert self.g, "I tried to find a grid at self.g, but failed"
689         broker = self.g.clients[0].storage_broker
690         sh     = self.g.clients[0]._secret_holder
691         data = upload.Data("data" * 10000, convergence="")
692         data.encoding_param_k = 3
693         data.encoding_param_happy = 4
694         data.encoding_param_n = 10
695         uploadable = upload.EncryptAnUploadable(data)
696         encoder = encode.Encoder()
697         encoder.set_encrypted_uploadable(uploadable)
698         status = upload.UploadStatus()
699         selector = upload.Tahoe2PeerSelector("dglev", "test", status)
700         storage_index = encoder.get_param("storage_index")
701         share_size = encoder.get_param("share_size")
702         block_size = encoder.get_param("block_size")
703         num_segments = encoder.get_param("num_segments")
704         d = selector.get_shareholders(broker, sh, storage_index,
705                                       share_size, block_size, num_segments,
706                                       10, 3, 4)
707         def _have_shareholders((used_peers, already_peers)):
708             assert servers_to_break <= len(used_peers)
709             for index in xrange(servers_to_break):
710                 server = list(used_peers)[index]
711                 for share in server.buckets.keys():
712                     server.buckets[share].abort()
713             buckets = {}
714             servermap = already_peers.copy()
715             for peer in used_peers:
716                 buckets.update(peer.buckets)
717                 for bucket in peer.buckets:
718                     servermap.setdefault(bucket, set()).add(peer.peerid)
719             encoder.set_shareholders(buckets, servermap)
720             d = encoder.start()
721             return d
722         d.addCallback(_have_shareholders)
723         return d
724
725
726     def _add_server(self, server_number, readonly=False):
727         assert self.g, "I tried to find a grid at self.g, but failed"
728         ss = self.g.make_server(server_number, readonly)
729         self.g.add_server(server_number, ss)
730
731
732     def _add_server_with_share(self, server_number, share_number=None,
733                                readonly=False):
734         self._add_server(server_number, readonly)
735         if share_number is not None:
736             self._copy_share_to_server(share_number, server_number)
737
738
739     def _copy_share_to_server(self, share_number, server_number):
740         ss = self.g.servers_by_number[server_number]
741         # Copy share i from the directory associated with the first 
742         # storage server to the directory associated with this one.
743         assert self.g, "I tried to find a grid at self.g, but failed"
744         assert self.shares, "I tried to find shares at self.shares, but failed"
745         old_share_location = self.shares[share_number][2]
746         new_share_location = os.path.join(ss.storedir, "shares")
747         si = uri.from_string(self.uri).get_storage_index()
748         new_share_location = os.path.join(new_share_location,
749                                           storage_index_to_dir(si))
750         if not os.path.exists(new_share_location):
751             os.makedirs(new_share_location)
752         new_share_location = os.path.join(new_share_location,
753                                           str(share_number))
754         if old_share_location != new_share_location:
755             shutil.copy(old_share_location, new_share_location)
756         shares = self.find_shares(self.uri)
757         # Make sure that the storage server has the share.
758         self.failUnless((share_number, ss.my_nodeid, new_share_location)
759                         in shares)
760
761     def _setup_grid(self):
762         """
763         I set up a NoNetworkGrid with a single server and client.
764         """
765         self.set_up_grid(num_clients=1, num_servers=1)
766
767     def _setup_and_upload(self, **kwargs):
768         """
769         I set up a NoNetworkGrid with a single server and client,
770         upload a file to it, store its uri in self.uri, and store its
771         sharedata in self.shares.
772         """
773         self._setup_grid()
774         client = self.g.clients[0]
775         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
776         if "n" in kwargs and "k" in kwargs:
777             client.DEFAULT_ENCODING_PARAMETERS['k'] = kwargs['k']
778             client.DEFAULT_ENCODING_PARAMETERS['n'] = kwargs['n']
779         data = upload.Data("data" * 10000, convergence="")
780         self.data = data
781         d = client.upload(data)
782         def _store_uri(ur):
783             self.uri = ur.uri
784         d.addCallback(_store_uri)
785         d.addCallback(lambda ign:
786             self.find_shares(self.uri))
787         def _store_shares(shares):
788             self.shares = shares
789         d.addCallback(_store_shares)
790         return d
791
792
793     def test_configure_parameters(self):
794         self.basedir = self.mktemp()
795         hooks = {0: self._set_up_nodes_extra_config}
796         self.set_up_grid(client_config_hooks=hooks)
797         c0 = self.g.clients[0]
798
799         DATA = "data" * 100
800         u = upload.Data(DATA, convergence="")
801         d = c0.upload(u)
802         d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
803         m = monitor.Monitor()
804         d.addCallback(lambda fn: fn.check(m))
805         def _check(cr):
806             data = cr.get_data()
807             self.failUnlessEqual(data["count-shares-needed"], 7)
808             self.failUnlessEqual(data["count-shares-expected"], 12)
809         d.addCallback(_check)
810         return d
811
812
813     def _setUp(self, ns):
814         # Used by test_happy_semantics and test_preexisting_share_behavior
815         # to set up the grid.
816         self.node = FakeClient(mode="good", num_servers=ns)
817         self.u = upload.Uploader()
818         self.u.running = True
819         self.u.parent = self.node
820
821
822     def test_happy_semantics(self):
823         self._setUp(2)
824         DATA = upload.Data("kittens" * 10000, convergence="")
825         # These parameters are unsatisfiable with only 2 servers.
826         self.set_encoding_parameters(k=3, happy=5, n=10)
827         d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
828                             "shares could be placed or found on only 2 "
829                             "server(s). We were asked to place shares on "
830                             "at least 5 server(s) such that any 3 of them "
831                             "have enough shares to recover the file",
832                             self.u.upload, DATA)
833         # Let's reset the client to have 10 servers
834         d.addCallback(lambda ign:
835             self._setUp(10))
836         # These parameters are satisfiable with 10 servers.
837         d.addCallback(lambda ign:
838             self.set_encoding_parameters(k=3, happy=5, n=10))
839         d.addCallback(lambda ign:
840             self.u.upload(DATA))
841         # Let's reset the client to have 7 servers
842         # (this is less than n, but more than h)
843         d.addCallback(lambda ign:
844             self._setUp(7))
845         # These parameters are satisfiable with 7 servers.
846         d.addCallback(lambda ign:
847             self.set_encoding_parameters(k=3, happy=5, n=10))
848         d.addCallback(lambda ign:
849             self.u.upload(DATA))
850         return d
851
852
853     def test_problem_layout_comment_52(self):
854         def _basedir():
855             self.basedir = self.mktemp()
856         _basedir()
857         # This scenario is at 
858         # http://allmydata.org/trac/tahoe/ticket/778#comment:52
859         #
860         # The scenario in comment:52 proposes that we have a layout
861         # like:
862         # server 0: shares 1 - 9
863         # server 1: share 0, read-only
864         # server 2: share 0, read-only
865         # server 3: share 0, read-only
866         # To get access to the shares, we will first upload to one 
867         # server, which will then have shares 0 - 9. We'll then 
868         # add three new servers, configure them to not accept any new
869         # shares, then write share 0 directly into the serverdir of each,
870         # and then remove share 0 from server 0 in the same way.
871         # Then each of servers 1 - 3 will report that they have share 0, 
872         # and will not accept any new share, while server 0 will report that
873         # it has shares 1 - 9 and will accept new shares.
874         # We'll then set 'happy' = 4, and see that an upload fails
875         # (as it should)
876         d = self._setup_and_upload()
877         d.addCallback(lambda ign:
878             self._add_server_with_share(server_number=1, share_number=0,
879                                         readonly=True))
880         d.addCallback(lambda ign:
881             self._add_server_with_share(server_number=2, share_number=0,
882                                         readonly=True))
883         d.addCallback(lambda ign:
884             self._add_server_with_share(server_number=3, share_number=0,
885                                         readonly=True))
886         # Remove the first share from server 0.
887         def _remove_share_0_from_server_0():
888             share_location = self.shares[0][2]
889             os.remove(share_location)
890         d.addCallback(lambda ign:
891             _remove_share_0_from_server_0())
892         # Set happy = 4 in the client.
893         def _prepare():
894             client = self.g.clients[0]
895             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
896             return client
897         d.addCallback(lambda ign:
898             _prepare())
899         # Uploading data should fail
900         d.addCallback(lambda client:
901             self.shouldFail(UploadUnhappinessError,
902                             "test_problem_layout_comment_52_test_1",
903                             "shares could be placed or found on 4 server(s), "
904                             "but they are not spread out evenly enough to "
905                             "ensure that any 3 of these servers would have "
906                             "enough shares to recover the file. "
907                             "We were asked to place shares on at "
908                             "least 4 servers such that any 3 of them have "
909                             "enough shares to recover the file",
910                             client.upload, upload.Data("data" * 10000,
911                                                        convergence="")))
912
913         # Do comment:52, but like this:
914         # server 2: empty
915         # server 3: share 0, read-only
916         # server 1: share 0, read-only
917         # server 0: shares 0-9
918         d.addCallback(lambda ign:
919             _basedir())
920         d.addCallback(lambda ign:
921             self._setup_and_upload())
922         d.addCallback(lambda ign:
923             self._add_server(server_number=2))
924         d.addCallback(lambda ign:
925             self._add_server_with_share(server_number=3, share_number=0,
926                                         readonly=True))
927         d.addCallback(lambda ign:
928             self._add_server_with_share(server_number=1, share_number=0,
929                                         readonly=True))
930         def _prepare2():
931             client = self.g.clients[0]
932             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
933             return client
934         d.addCallback(lambda ign:
935             _prepare2())
936         d.addCallback(lambda client:
937             self.shouldFail(UploadUnhappinessError,
938                             "test_problem_layout_comment_52_test_2",
939                             "shares could be placed on only 3 server(s) such "
940                             "that any 3 of them have enough shares to recover "
941                             "the file, but we were asked to place shares on "
942                             "at least 4 such servers.",
943                             client.upload, upload.Data("data" * 10000,
944                                                        convergence="")))
945         return d
946
947
948     def test_problem_layout_comment_53(self):
949         # This scenario is at
950         # http://allmydata.org/trac/tahoe/ticket/778#comment:53
951         #
952         # Set up the grid to have one server
953         def _change_basedir(ign):
954             self.basedir = self.mktemp()
955         _change_basedir(None)
956         # We start by uploading all of the shares to one server.
957         # Next, we'll add three new servers to our NoNetworkGrid. We'll add
958         # one share from our initial upload to each of these.
959         # The counterintuitive ordering of the share numbers is to deal with 
960         # the permuting of these servers -- distributing the shares this 
961         # way ensures that the Tahoe2PeerSelector sees them in the order 
962         # described below.
963         d = self._setup_and_upload()
964         d.addCallback(lambda ign:
965             self._add_server_with_share(server_number=1, share_number=2))
966         d.addCallback(lambda ign:
967             self._add_server_with_share(server_number=2, share_number=0))
968         d.addCallback(lambda ign:
969             self._add_server_with_share(server_number=3, share_number=1))
970         # So, we now have the following layout:
971         # server 0: shares 0 - 9
972         # server 1: share 2
973         # server 2: share 0
974         # server 3: share 1
975         # We change the 'happy' parameter in the client to 4. 
976         # The Tahoe2PeerSelector will see the peers permuted as:
977         # 2, 3, 1, 0
978         # Ideally, a reupload of our original data should work.
979         def _reset_encoding_parameters(ign, happy=4):
980             client = self.g.clients[0]
981             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
982             return client
983         d.addCallback(_reset_encoding_parameters)
984         d.addCallback(lambda client:
985             client.upload(upload.Data("data" * 10000, convergence="")))
986
987
988         # This scenario is basically comment:53, but changed so that the 
989         # Tahoe2PeerSelector sees the server with all of the shares before
990         # any of the other servers.
991         # The layout is:
992         # server 2: shares 0 - 9
993         # server 3: share 0 
994         # server 1: share 1 
995         # server 4: share 2
996         # The Tahoe2PeerSelector sees the peers permuted as:
997         # 2, 3, 1, 4
998         # Note that server 0 has been replaced by server 4; this makes it 
999         # easier to ensure that the last server seen by Tahoe2PeerSelector
1000         # has only one share. 
1001         d.addCallback(_change_basedir)
1002         d.addCallback(lambda ign:
1003             self._setup_and_upload())
1004         d.addCallback(lambda ign:
1005             self._add_server_with_share(server_number=2, share_number=0))
1006         d.addCallback(lambda ign:
1007             self._add_server_with_share(server_number=3, share_number=1))
1008         d.addCallback(lambda ign:
1009             self._add_server_with_share(server_number=1, share_number=2))
1010         # Copy all of the other shares to server number 2
1011         def _copy_shares(ign):
1012             for i in xrange(0, 10):
1013                 self._copy_share_to_server(i, 2)
1014         d.addCallback(_copy_shares)
1015         # Remove the first server, and add a placeholder with share 0
1016         d.addCallback(lambda ign:
1017             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1018         d.addCallback(lambda ign:
1019             self._add_server_with_share(server_number=4, share_number=0))
1020         # Now try uploading. 
1021         d.addCallback(_reset_encoding_parameters)
1022         d.addCallback(lambda client:
1023             client.upload(upload.Data("data" * 10000, convergence="")))
1024
1025
1026         # Try the same thing, but with empty servers after the first one
1027         # We want to make sure that Tahoe2PeerSelector will redistribute
1028         # shares as necessary, not simply discover an existing layout.
1029         # The layout is:
1030         # server 2: shares 0 - 9
1031         # server 3: empty
1032         # server 1: empty
1033         # server 4: empty
1034         d.addCallback(_change_basedir)
1035         d.addCallback(lambda ign:
1036             self._setup_and_upload())
1037         d.addCallback(lambda ign:
1038             self._add_server(server_number=2))
1039         d.addCallback(lambda ign:
1040             self._add_server(server_number=3))
1041         d.addCallback(lambda ign:
1042             self._add_server(server_number=1))
1043         d.addCallback(lambda ign:
1044             self._add_server(server_number=4))
1045         d.addCallback(_copy_shares)
1046         d.addCallback(lambda ign:
1047             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1048         d.addCallback(_reset_encoding_parameters)
1049         d.addCallback(lambda client:
1050             client.upload(upload.Data("data" * 10000, convergence="")))
1051         # Make sure that only as many shares as necessary to satisfy
1052         # servers of happiness were pushed.
1053         d.addCallback(lambda results:
1054             self.failUnlessEqual(results.pushed_shares, 3))
1055         return d
1056
1057
1058     def test_happiness_with_some_readonly_peers(self):
1059         # Try the following layout
1060         # server 2: shares 0-9
1061         # server 4: share 0, read-only
1062         # server 3: share 1, read-only
1063         # server 1: share 2, read-only
1064         self.basedir = self.mktemp()
1065         d = self._setup_and_upload()
1066         d.addCallback(lambda ign:
1067             self._add_server_with_share(server_number=2, share_number=0))
1068         d.addCallback(lambda ign:
1069             self._add_server_with_share(server_number=3, share_number=1,
1070                                         readonly=True))
1071         d.addCallback(lambda ign:
1072             self._add_server_with_share(server_number=1, share_number=2,
1073                                         readonly=True))
1074         # Copy all of the other shares to server number 2
1075         def _copy_shares(ign):
1076             for i in xrange(1, 10):
1077                 self._copy_share_to_server(i, 2)
1078         d.addCallback(_copy_shares)
1079         # Remove server 0, and add another in its place
1080         d.addCallback(lambda ign:
1081             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1082         d.addCallback(lambda ign:
1083             self._add_server_with_share(server_number=4, share_number=0,
1084                                         readonly=True))
1085         def _reset_encoding_parameters(ign, happy=4):
1086             client = self.g.clients[0]
1087             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1088             return client
1089         d.addCallback(_reset_encoding_parameters)
1090         d.addCallback(lambda client:
1091             client.upload(upload.Data("data" * 10000, convergence="")))
1092         return d
1093
1094
1095     def test_happiness_with_all_readonly_peers(self):
1096         # server 3: share 1, read-only
1097         # server 1: share 2, read-only
1098         # server 2: shares 0-9, read-only
1099         # server 4: share 0, read-only
1100         # The idea with this test is to make sure that the survey of
1101         # read-only peers doesn't undercount servers of happiness
1102         self.basedir = self.mktemp()
1103         d = self._setup_and_upload()
1104         d.addCallback(lambda ign:
1105             self._add_server_with_share(server_number=4, share_number=0,
1106                                         readonly=True))
1107         d.addCallback(lambda ign:
1108             self._add_server_with_share(server_number=3, share_number=1,
1109                                         readonly=True))
1110         d.addCallback(lambda ign:
1111             self._add_server_with_share(server_number=1, share_number=2,
1112                                         readonly=True))
1113         d.addCallback(lambda ign:
1114             self._add_server_with_share(server_number=2, share_number=0,
1115                                         readonly=True))
1116         def _copy_shares(ign):
1117             for i in xrange(1, 10):
1118                 self._copy_share_to_server(i, 2)
1119         d.addCallback(_copy_shares)
1120         d.addCallback(lambda ign:
1121             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1122         def _reset_encoding_parameters(ign, happy=4):
1123             client = self.g.clients[0]
1124             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1125             return client
1126         d.addCallback(_reset_encoding_parameters)
1127         d.addCallback(lambda client:
1128             client.upload(upload.Data("data" * 10000, convergence="")))
1129         return d
1130
1131
1132     def test_dropped_servers_in_encoder(self):
1133         # The Encoder does its own "servers_of_happiness" check if it 
1134         # happens to lose a bucket during an upload (it assumes that 
1135         # the layout presented to it satisfies "servers_of_happiness"
1136         # until a failure occurs)
1137         # 
1138         # This test simulates an upload where servers break after peer
1139         # selection, but before they are written to.
1140         def _set_basedir(ign=None):
1141             self.basedir = self.mktemp()
1142         _set_basedir()
1143         d = self._setup_and_upload();
1144         # Add 5 servers
1145         def _do_server_setup(ign):
1146             self._add_server(server_number=1)
1147             self._add_server(server_number=2)
1148             self._add_server(server_number=3)
1149             self._add_server(server_number=4)
1150             self._add_server(server_number=5)
1151         d.addCallback(_do_server_setup)
1152         # remove the original server
1153         # (necessary to ensure that the Tahoe2PeerSelector will distribute
1154         #  all the shares)
1155         def _remove_server(ign):
1156             server = self.g.servers_by_number[0]
1157             self.g.remove_server(server.my_nodeid)
1158         d.addCallback(_remove_server)
1159         # This should succeed; we still have 4 servers, and the 
1160         # happiness of the upload is 4.
1161         d.addCallback(lambda ign:
1162             self._do_upload_with_broken_servers(1))
1163         # Now, do the same thing over again, but drop 2 servers instead
1164         # of 1. This should fail, because servers_of_happiness is 4 and 
1165         # we can't satisfy that.
1166         d.addCallback(_set_basedir)
1167         d.addCallback(lambda ign:
1168             self._setup_and_upload())
1169         d.addCallback(_do_server_setup)
1170         d.addCallback(_remove_server)
1171         d.addCallback(lambda ign:
1172             self.shouldFail(UploadUnhappinessError,
1173                             "test_dropped_servers_in_encoder",
1174                             "shares could be placed on only 3 server(s) "
1175                             "such that any 3 of them have enough shares to "
1176                             "recover the file, but we were asked to place "
1177                             "shares on at least 4",
1178                             self._do_upload_with_broken_servers, 2))
1179         # Now do the same thing over again, but make some of the servers
1180         # readonly, break some of the ones that aren't, and make sure that
1181         # happiness accounting is preserved.
1182         d.addCallback(_set_basedir)
1183         d.addCallback(lambda ign:
1184             self._setup_and_upload())
1185         def _do_server_setup_2(ign):
1186             self._add_server(1)
1187             self._add_server(2)
1188             self._add_server(3)
1189             self._add_server_with_share(4, 7, readonly=True)
1190             self._add_server_with_share(5, 8, readonly=True)
1191         d.addCallback(_do_server_setup_2)
1192         d.addCallback(_remove_server)
1193         d.addCallback(lambda ign:
1194             self._do_upload_with_broken_servers(1))
1195         d.addCallback(_set_basedir)
1196         d.addCallback(lambda ign:
1197             self._setup_and_upload())
1198         d.addCallback(_do_server_setup_2)
1199         d.addCallback(_remove_server)
1200         d.addCallback(lambda ign:
1201             self.shouldFail(UploadUnhappinessError,
1202                             "test_dropped_servers_in_encoder",
1203                             "shares could be placed on only 3 server(s) "
1204                             "such that any 3 of them have enough shares to "
1205                             "recover the file, but we were asked to place "
1206                             "shares on at least 4",
1207                             self._do_upload_with_broken_servers, 2))
1208         return d
1209
1210
1211     def test_merge_peers(self):
1212         # merge_peers merges a list of used_peers and a dict of 
1213         # shareid -> peerid mappings.
1214         shares = {
1215                     1 : set(["server1"]),
1216                     2 : set(["server2"]),
1217                     3 : set(["server3"]),
1218                     4 : set(["server4", "server5"]),
1219                     5 : set(["server1", "server2"]),
1220                  }
1221         # if not provided with a used_peers argument, it should just
1222         # return the first argument unchanged.
1223         self.failUnlessEqual(shares, merge_peers(shares, set([])))
1224         class FakePeerTracker:
1225             pass
1226         trackers = []
1227         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1228             t = FakePeerTracker()
1229             t.peerid = server
1230             t.buckets = [i]
1231             trackers.append(t)
1232         expected = {
1233                     1 : set(["server1"]),
1234                     2 : set(["server2"]),
1235                     3 : set(["server3"]),
1236                     4 : set(["server4", "server5"]),
1237                     5 : set(["server1", "server2", "server5"]),
1238                     6 : set(["server6"]),
1239                     7 : set(["server7"]),
1240                     8 : set(["server8"]),
1241                    }
1242         self.failUnlessEqual(expected, merge_peers(shares, set(trackers)))
1243         shares2 = {}
1244         expected = {
1245                     5 : set(["server5"]),
1246                     6 : set(["server6"]),
1247                     7 : set(["server7"]),
1248                     8 : set(["server8"]),
1249                    }
1250         self.failUnlessEqual(expected, merge_peers(shares2, set(trackers)))
1251         shares3 = {}
1252         trackers = []
1253         expected = {}
1254         for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1255             shares3[i] = set([server])
1256             t = FakePeerTracker()
1257             t.peerid = server
1258             t.buckets = [i]
1259             trackers.append(t)
1260             expected[i] = set([server])
1261         self.failUnlessEqual(expected, merge_peers(shares3, set(trackers)))
1262
1263
1264     def test_servers_of_happiness_utility_function(self):
1265         # These tests are concerned with the servers_of_happiness()
1266         # utility function, and its underlying matching algorithm. Other
1267         # aspects of the servers_of_happiness behavior are tested
1268         # elsehwere These tests exist to ensure that
1269         # servers_of_happiness doesn't under or overcount the happiness
1270         # value for given inputs.
1271
1272         # servers_of_happiness expects a dict of 
1273         # shnum => set(peerids) as a preexisting shares argument.
1274         test1 = {
1275                  1 : set(["server1"]),
1276                  2 : set(["server2"]),
1277                  3 : set(["server3"]),
1278                  4 : set(["server4"])
1279                 }
1280         happy = servers_of_happiness(test1)
1281         self.failUnlessEqual(4, happy)
1282         test1[4] = set(["server1"])
1283         # We've added a duplicate server, so now servers_of_happiness
1284         # should be 3 instead of 4.
1285         happy = servers_of_happiness(test1)
1286         self.failUnlessEqual(3, happy)
1287         # The second argument of merge_peers should be a set of 
1288         # objects with peerid and buckets as attributes. In actual use, 
1289         # these will be PeerTracker instances, but for testing it is fine 
1290         # to make a FakePeerTracker whose job is to hold those instance
1291         # variables to test that part.
1292         class FakePeerTracker:
1293             pass
1294         trackers = []
1295         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1296             t = FakePeerTracker()
1297             t.peerid = server
1298             t.buckets = [i]
1299             trackers.append(t)
1300         # Recall that test1 is a server layout with servers_of_happiness
1301         # = 3.  Since there isn't any overlap between the shnum ->
1302         # set([peerid]) correspondences in test1 and those in trackers,
1303         # the result here should be 7.
1304         test2 = merge_peers(test1, set(trackers))
1305         happy = servers_of_happiness(test2)
1306         self.failUnlessEqual(7, happy)
1307         # Now add an overlapping server to trackers. This is redundant,
1308         # so it should not cause the previously reported happiness value
1309         # to change.
1310         t = FakePeerTracker()
1311         t.peerid = "server1"
1312         t.buckets = [1]
1313         trackers.append(t)
1314         test2 = merge_peers(test1, set(trackers))
1315         happy = servers_of_happiness(test2)
1316         self.failUnlessEqual(7, happy)
1317         test = {}
1318         happy = servers_of_happiness(test)
1319         self.failUnlessEqual(0, happy)
1320         # Test a more substantial overlap between the trackers and the 
1321         # existing assignments.
1322         test = {
1323             1 : set(['server1']),
1324             2 : set(['server2']),
1325             3 : set(['server3']),
1326             4 : set(['server4']),
1327         }
1328         trackers = []
1329         t = FakePeerTracker()
1330         t.peerid = 'server5'
1331         t.buckets = [4]
1332         trackers.append(t)
1333         t = FakePeerTracker()
1334         t.peerid = 'server6'
1335         t.buckets = [3, 5]
1336         trackers.append(t)
1337         # The value returned by servers_of_happiness is the size 
1338         # of a maximum matching in the bipartite graph that
1339         # servers_of_happiness() makes between peerids and share
1340         # numbers. It should find something like this:
1341         # (server 1, share 1)
1342         # (server 2, share 2)
1343         # (server 3, share 3)
1344         # (server 5, share 4)
1345         # (server 6, share 5)
1346         # 
1347         # and, since there are 5 edges in this matching, it should
1348         # return 5.
1349         test2 = merge_peers(test, set(trackers))
1350         happy = servers_of_happiness(test2)
1351         self.failUnlessEqual(5, happy)
1352         # Zooko's first puzzle: 
1353         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1354         #
1355         # server 1: shares 0, 1
1356         # server 2: shares 1, 2
1357         # server 3: share 2
1358         #
1359         # This should yield happiness of 3.
1360         test = {
1361             0 : set(['server1']),
1362             1 : set(['server1', 'server2']),
1363             2 : set(['server2', 'server3']),
1364         }
1365         self.failUnlessEqual(3, servers_of_happiness(test))
1366         # Zooko's second puzzle:        
1367         # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1368         # 
1369         # server 1: shares 0, 1
1370         # server 2: share 1
1371         # 
1372         # This should yield happiness of 2.
1373         test = {
1374             0 : set(['server1']),
1375             1 : set(['server1', 'server2']),
1376         }
1377         self.failUnlessEqual(2, servers_of_happiness(test))
1378
1379
1380     def test_shares_by_server(self):
1381         test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1382         sbs = shares_by_server(test)
1383         self.failUnlessEqual(set([1]), sbs["server1"])
1384         self.failUnlessEqual(set([2]), sbs["server2"])
1385         self.failUnlessEqual(set([3]), sbs["server3"])
1386         self.failUnlessEqual(set([4]), sbs["server4"])
1387         test1 = {
1388                     1 : set(["server1"]),
1389                     2 : set(["server1"]),
1390                     3 : set(["server1"]),
1391                     4 : set(["server2"]),
1392                     5 : set(["server2"])
1393                 }
1394         sbs = shares_by_server(test1)
1395         self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1396         self.failUnlessEqual(set([4, 5]), sbs["server2"])
1397         # This should fail unless the peerid part of the mapping is a set
1398         test2 = {1: "server1"}
1399         self.shouldFail(AssertionError,
1400                        "test_shares_by_server",
1401                        "",
1402                        shares_by_server, test2)
1403
1404
1405     def test_existing_share_detection(self):
1406         self.basedir = self.mktemp()
1407         d = self._setup_and_upload()
1408         # Our final setup should look like this:
1409         # server 1: shares 0 - 9, read-only
1410         # server 2: empty
1411         # server 3: empty
1412         # server 4: empty
1413         # The purpose of this test is to make sure that the peer selector
1414         # knows about the shares on server 1, even though it is read-only.
1415         # It used to simply filter these out, which would cause the test
1416         # to fail when servers_of_happiness = 4.
1417         d.addCallback(lambda ign:
1418             self._add_server_with_share(1, 0, True))
1419         d.addCallback(lambda ign:
1420             self._add_server(2))
1421         d.addCallback(lambda ign:
1422             self._add_server(3))
1423         d.addCallback(lambda ign:
1424             self._add_server(4))
1425         def _copy_shares(ign):
1426             for i in xrange(1, 10):
1427                 self._copy_share_to_server(i, 1)
1428         d.addCallback(_copy_shares)
1429         d.addCallback(lambda ign:
1430             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1431         def _prepare_client(ign):
1432             client = self.g.clients[0]
1433             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1434             return client
1435         d.addCallback(_prepare_client)
1436         d.addCallback(lambda client:
1437             client.upload(upload.Data("data" * 10000, convergence="")))
1438         return d
1439
1440
1441     def test_query_counting(self):
1442         # If peer selection fails, Tahoe2PeerSelector prints out a lot
1443         # of helpful diagnostic information, including query stats.
1444         # This test helps make sure that that information is accurate.
1445         self.basedir = self.mktemp()
1446         d = self._setup_and_upload()
1447         def _setup(ign):
1448             for i in xrange(1, 11):
1449                 self._add_server(server_number=i)
1450             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1451             c = self.g.clients[0]
1452             # We set happy to an unsatisfiable value so that we can check the 
1453             # counting in the exception message. The same progress message
1454             # is also used when the upload is successful, but in that case it
1455             # only gets written to a log, so we can't see what it says.
1456             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1457             return c
1458         d.addCallback(_setup)
1459         d.addCallback(lambda c:
1460             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1461                             "10 queries placed some shares",
1462                             c.upload, upload.Data("data" * 10000,
1463                                                   convergence="")))
1464         # Now try with some readonly servers. We want to make sure that
1465         # the readonly peer share discovery phase is counted correctly.
1466         def _reset(ign):
1467             self.basedir = self.mktemp()
1468             self.g = None
1469         d.addCallback(_reset)
1470         d.addCallback(lambda ign:
1471             self._setup_and_upload())
1472         def _then(ign):
1473             for i in xrange(1, 11):
1474                 self._add_server(server_number=i)
1475             self._add_server(server_number=11, readonly=True)
1476             self._add_server(server_number=12, readonly=True)
1477             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1478             c = self.g.clients[0]
1479             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1480             return c
1481         d.addCallback(_then)
1482         d.addCallback(lambda c:
1483             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1484                             "2 placed none (of which 2 placed none due to "
1485                             "the server being full",
1486                             c.upload, upload.Data("data" * 10000,
1487                                                   convergence="")))
1488         # Now try the case where the upload process finds a bunch of the
1489         # shares that it wants to place on the first server, including
1490         # the one that it wanted to allocate there. Though no shares will
1491         # be allocated in this request, it should still be called
1492         # productive, since it caused some homeless shares to be 
1493         # removed.
1494         d.addCallback(_reset)
1495         d.addCallback(lambda ign:
1496             self._setup_and_upload())
1497
1498         def _next(ign):
1499             for i in xrange(1, 11):
1500                 self._add_server(server_number=i)
1501             # Copy all of the shares to server 9, since that will be
1502             # the first one that the selector sees.
1503             for i in xrange(10):
1504                 self._copy_share_to_server(i, 9)
1505             # Remove server 0, and its contents
1506             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1507             # Make happiness unsatisfiable
1508             c = self.g.clients[0]
1509             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 45
1510             return c
1511         d.addCallback(_next)
1512         d.addCallback(lambda c:
1513             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1514                             "1 queries placed some shares",
1515                             c.upload, upload.Data("data" * 10000,
1516                                                   convergence="")))
1517         return d
1518
1519
1520     def test_upper_limit_on_readonly_queries(self):
1521         self.basedir = self.mktemp()
1522         d = self._setup_and_upload()
1523         def _then(ign):
1524             for i in xrange(1, 11):
1525                 self._add_server(server_number=i, readonly=True)
1526             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1527             c = self.g.clients[0]
1528             c.DEFAULT_ENCODING_PARAMETERS['k'] = 2
1529             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1530             c.DEFAULT_ENCODING_PARAMETERS['n'] = 4
1531             return c
1532         d.addCallback(_then)
1533         d.addCallback(lambda client:
1534             self.shouldFail(UploadUnhappinessError,
1535                             "test_upper_limit_on_readonly_queries",
1536                             "sent 8 queries to 8 peers",
1537                             client.upload,
1538                             upload.Data('data' * 10000, convergence="")))
1539         return d
1540
1541
1542     def test_exception_messages_during_peer_selection(self):
1543         # server 1: read-only, no shares
1544         # server 2: read-only, no shares
1545         # server 3: read-only, no shares
1546         # server 4: read-only, no shares
1547         # server 5: read-only, no shares
1548         # This will fail, but we want to make sure that the log messages
1549         # are informative about why it has failed.
1550         self.basedir = self.mktemp()
1551         d = self._setup_and_upload()
1552         d.addCallback(lambda ign:
1553             self._add_server(server_number=1, readonly=True))
1554         d.addCallback(lambda ign:
1555             self._add_server(server_number=2, readonly=True))
1556         d.addCallback(lambda ign:
1557             self._add_server(server_number=3, readonly=True))
1558         d.addCallback(lambda ign:
1559             self._add_server(server_number=4, readonly=True))
1560         d.addCallback(lambda ign:
1561             self._add_server(server_number=5, readonly=True))
1562         d.addCallback(lambda ign:
1563             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1564         def _reset_encoding_parameters(ign, happy=4):
1565             client = self.g.clients[0]
1566             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1567             return client
1568         d.addCallback(_reset_encoding_parameters)
1569         d.addCallback(lambda client:
1570             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1571                             "placed 0 shares out of 10 "
1572                             "total (10 homeless), want to place shares on at "
1573                             "least 4 servers such that any 3 of them have "
1574                             "enough shares to recover the file, "
1575                             "sent 5 queries to 5 peers, 0 queries placed "
1576                             "some shares, 5 placed none "
1577                             "(of which 5 placed none due to the server being "
1578                             "full and 0 placed none due to an error)",
1579                             client.upload,
1580                             upload.Data("data" * 10000, convergence="")))
1581
1582
1583         # server 1: read-only, no shares
1584         # server 2: broken, no shares
1585         # server 3: read-only, no shares
1586         # server 4: read-only, no shares
1587         # server 5: read-only, no shares
1588         def _reset(ign):
1589             self.basedir = self.mktemp()
1590         d.addCallback(_reset)
1591         d.addCallback(lambda ign:
1592             self._setup_and_upload())
1593         d.addCallback(lambda ign:
1594             self._add_server(server_number=1, readonly=True))
1595         d.addCallback(lambda ign:
1596             self._add_server(server_number=2))
1597         def _break_server_2(ign):
1598             server = self.g.servers_by_number[2].my_nodeid
1599             # We have to break the server in servers_by_id, 
1600             # because the one in servers_by_number isn't wrapped,
1601             # and doesn't look at its broken attribute when answering
1602             # queries.
1603             self.g.servers_by_id[server].broken = True
1604         d.addCallback(_break_server_2)
1605         d.addCallback(lambda ign:
1606             self._add_server(server_number=3, readonly=True))
1607         d.addCallback(lambda ign:
1608             self._add_server(server_number=4, readonly=True))
1609         d.addCallback(lambda ign:
1610             self._add_server(server_number=5, readonly=True))
1611         d.addCallback(lambda ign:
1612             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1613         d.addCallback(_reset_encoding_parameters)
1614         d.addCallback(lambda client:
1615             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1616                             "placed 0 shares out of 10 "
1617                             "total (10 homeless), want to place shares on at "
1618                             "least 4 servers such that any 3 of them have "
1619                             "enough shares to recover the file, "
1620                             "sent 5 queries to 5 peers, 0 queries placed "
1621                             "some shares, 5 placed none "
1622                             "(of which 4 placed none due to the server being "
1623                             "full and 1 placed none due to an error)",
1624                             client.upload,
1625                             upload.Data("data" * 10000, convergence="")))
1626         # server 0, server 1 = empty, accepting shares
1627         # This should place all of the shares, but still fail with happy=4.
1628         # We want to make sure that the exception message is worded correctly.
1629         d.addCallback(_reset)
1630         d.addCallback(lambda ign:
1631             self._setup_grid())
1632         d.addCallback(lambda ign:
1633             self._add_server(server_number=1))
1634         d.addCallback(_reset_encoding_parameters)
1635         d.addCallback(lambda client:
1636             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1637                             "shares could be placed or found on only 2 "
1638                             "server(s). We were asked to place shares on at "
1639                             "least 4 server(s) such that any 3 of them have "
1640                             "enough shares to recover the file.",
1641                             client.upload, upload.Data("data" * 10000,
1642                                                        convergence="")))
1643         # servers 0 - 4 = empty, accepting shares
1644         # This too should place all the shares, and this too should fail,
1645         # but since the effective happiness is more than the k encoding
1646         # parameter, it should trigger a different error message than the one
1647         # above.
1648         d.addCallback(_reset)
1649         d.addCallback(lambda ign:
1650             self._setup_grid())
1651         d.addCallback(lambda ign:
1652             self._add_server(server_number=1))
1653         d.addCallback(lambda ign:
1654             self._add_server(server_number=2))
1655         d.addCallback(lambda ign:
1656             self._add_server(server_number=3))
1657         d.addCallback(lambda ign:
1658             self._add_server(server_number=4))
1659         d.addCallback(_reset_encoding_parameters, happy=7)
1660         d.addCallback(lambda client:
1661             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1662                             "shares could be placed on only 5 server(s) such "
1663                             "that any 3 of them have enough shares to recover "
1664                             "the file, but we were asked to place shares on "
1665                             "at least 7 such servers.",
1666                             client.upload, upload.Data("data" * 10000,
1667                                                        convergence="")))
1668         # server 0: shares 0 - 9
1669         # server 1: share 0, read-only
1670         # server 2: share 0, read-only
1671         # server 3: share 0, read-only
1672         # This should place all of the shares, but fail with happy=4.
1673         # Since the number of servers with shares is more than the number
1674         # necessary to reconstitute the file, this will trigger a different
1675         # error message than either of those above. 
1676         d.addCallback(_reset)
1677         d.addCallback(lambda ign:
1678             self._setup_and_upload())
1679         d.addCallback(lambda ign:
1680             self._add_server_with_share(server_number=1, share_number=0,
1681                                         readonly=True))
1682         d.addCallback(lambda ign:
1683             self._add_server_with_share(server_number=2, share_number=0,
1684                                         readonly=True))
1685         d.addCallback(lambda ign:
1686             self._add_server_with_share(server_number=3, share_number=0,
1687                                         readonly=True))
1688         d.addCallback(_reset_encoding_parameters, happy=7)
1689         d.addCallback(lambda client:
1690             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1691                             "shares could be placed or found on 4 server(s), "
1692                             "but they are not spread out evenly enough to "
1693                             "ensure that any 3 of these servers would have "
1694                             "enough shares to recover the file. We were asked "
1695                             "to place shares on at least 7 servers such that "
1696                             "any 3 of them have enough shares to recover the "
1697                             "file",
1698                             client.upload, upload.Data("data" * 10000,
1699                                                        convergence="")))
1700         return d
1701
1702
1703     def test_problem_layout_comment_187(self):
1704         # #778 comment 187 broke an initial attempt at a share
1705         # redistribution algorithm. This test is here to demonstrate the
1706         # breakage, and to test that subsequent algorithms don't also
1707         # break in the same way.
1708         self.basedir = self.mktemp()
1709         d = self._setup_and_upload(k=2, n=3)
1710
1711         # server 1: shares 0, 1, 2, readonly
1712         # server 2: share 0, readonly
1713         # server 3: share 0
1714         def _setup(ign):
1715             self._add_server_with_share(server_number=1, share_number=0,
1716                                         readonly=True)
1717             self._add_server_with_share(server_number=2, share_number=0,
1718                                         readonly=True)
1719             self._add_server_with_share(server_number=3, share_number=0)
1720             # Copy shares
1721             self._copy_share_to_server(1, 1)
1722             self._copy_share_to_server(2, 1)
1723             # Remove server 0
1724             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1725             client = self.g.clients[0]
1726             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
1727             return client
1728
1729         d.addCallback(_setup)
1730         d.addCallback(lambda client:
1731             client.upload(upload.Data("data" * 10000, convergence="")))
1732         return d
1733     test_problem_layout_comment_187.todo = "this isn't fixed yet"
1734
1735
1736     def test_upload_succeeds_with_some_homeless_shares(self):
1737         # If the upload is forced to stop trying to place shares before
1738         # it has placed (or otherwise accounted) for all of them, but it
1739         # has placed enough to satisfy the upload health criteria that
1740         # we're using, it should still succeed.
1741         self.basedir = self.mktemp()
1742         d = self._setup_and_upload()
1743         def _server_setup(ign):
1744             # Add four servers so that we have a layout like this:
1745             # server 1: share 0, read-only
1746             # server 2: share 1, read-only
1747             # server 3: share 2, read-only
1748             # server 4: share 3, read-only
1749             # If we set happy = 4, the upload will manage to satisfy
1750             # servers of happiness, but not place all of the shares; we
1751             # want to test that the upload is declared successful in
1752             # this case.
1753             self._add_server_with_share(server_number=1, share_number=0,
1754                                         readonly=True)
1755             self._add_server_with_share(server_number=2, share_number=1,
1756                                         readonly=True)
1757             self._add_server_with_share(server_number=3, share_number=2,
1758                                         readonly=True)
1759             self._add_server_with_share(server_number=4, share_number=3,
1760                                         readonly=True)
1761             # Remove server 0.
1762             self.g.remove_server(self.g.servers_by_number[0].my_nodeid)
1763             # Set the client appropriately
1764             c = self.g.clients[0]
1765             c.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1766             return c
1767         d.addCallback(_server_setup)
1768         d.addCallback(lambda client:
1769             client.upload(upload.Data("data" * 10000, convergence="")))
1770         return d
1771
1772
1773     def test_uploader_skips_over_servers_with_only_one_share(self):
1774         # We want to make sure that the redistribution logic ignores
1775         # servers with only one share, since placing these shares
1776         # elsewhere will at best keep happiness the same as it was, and
1777         # at worst hurt it.
1778         self.basedir = self.mktemp()
1779         d = self._setup_and_upload()
1780         def _server_setup(ign):
1781             # Add some servers so that the upload will need to
1782             # redistribute, but will first pass over a couple of servers
1783             # that don't have enough shares to redistribute before
1784             # finding one that does have shares to redistribute. 
1785             self._add_server_with_share(server_number=1, share_number=0)
1786             self._add_server_with_share(server_number=2, share_number=2)
1787             self._add_server_with_share(server_number=3, share_number=1)
1788             self._add_server_with_share(server_number=8, share_number=4)
1789             self._add_server_with_share(server_number=5, share_number=5)
1790             self._add_server_with_share(server_number=10, share_number=7)
1791             for i in xrange(4):
1792                 self._copy_share_to_server(i, 2)
1793             return self.g.clients[0]
1794         d.addCallback(_server_setup)
1795         d.addCallback(lambda client:
1796             client.upload(upload.Data("data" * 10000, convergence="")))
1797         return d
1798
1799
1800     def _set_up_nodes_extra_config(self, clientdir):
1801         cfgfn = os.path.join(clientdir, "tahoe.cfg")
1802         oldcfg = open(cfgfn, "r").read()
1803         f = open(cfgfn, "wt")
1804         f.write(oldcfg)
1805         f.write("\n")
1806         f.write("[client]\n")
1807         f.write("shares.needed = 7\n")
1808         f.write("shares.total = 12\n")
1809         f.write("\n")
1810         f.close()
1811         return None
1812
1813 # TODO:
1814 #  upload with exactly 75 peers (shares_of_happiness)
1815 #  have a download fail
1816 #  cancel a download (need to implement more cancel stuff)