]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
Replace "UploadHappinessError" with "UploadUnhappinessError" in tests.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1
2 import os, shutil
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.python.failure import Failure
6 from twisted.python import log
7 from twisted.internet import defer
8 from foolscap.api import fireEventually
9
10 import allmydata # for __full_version__
11 from allmydata import uri, monitor, client
12 from allmydata.immutable import upload, encode
13 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
14 from allmydata.util.assertutil import precondition
15 from allmydata.util.deferredutil import DeferredListShouldSucceed
16 from no_network import GridTestMixin
17 from common_util import ShouldFailMixin
18 from allmydata.storage_client import StorageFarmBroker
19 from allmydata.storage.server import storage_index_to_dir
20
21 MiB = 1024*1024
22
23 def extract_uri(results):
24     return results.uri
25
26 # Some of these took longer than 480 seconds on Zandr's arm box, but this may
27 # have been due to an earlier test ERROR'ing out due to timeout, which seems
28 # to screw up subsequent tests.
29 timeout = 960
30
31 class Uploadable(unittest.TestCase):
32     def shouldEqual(self, data, expected):
33         self.failUnless(isinstance(data, list))
34         for e in data:
35             self.failUnless(isinstance(e, str))
36         s = "".join(data)
37         self.failUnlessEqual(s, expected)
38
39     def test_filehandle_random_key(self):
40         return self._test_filehandle(convergence=None)
41
42     def test_filehandle_convergent_encryption(self):
43         return self._test_filehandle(convergence="some convergence string")
44
45     def _test_filehandle(self, convergence):
46         s = StringIO("a"*41)
47         u = upload.FileHandle(s, convergence=convergence)
48         d = u.get_size()
49         d.addCallback(self.failUnlessEqual, 41)
50         d.addCallback(lambda res: u.read(1))
51         d.addCallback(self.shouldEqual, "a")
52         d.addCallback(lambda res: u.read(80))
53         d.addCallback(self.shouldEqual, "a"*40)
54         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
55         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
56         return d
57
58     def test_filename(self):
59         basedir = "upload/Uploadable/test_filename"
60         os.makedirs(basedir)
61         fn = os.path.join(basedir, "file")
62         f = open(fn, "w")
63         f.write("a"*41)
64         f.close()
65         u = upload.FileName(fn, convergence=None)
66         d = u.get_size()
67         d.addCallback(self.failUnlessEqual, 41)
68         d.addCallback(lambda res: u.read(1))
69         d.addCallback(self.shouldEqual, "a")
70         d.addCallback(lambda res: u.read(80))
71         d.addCallback(self.shouldEqual, "a"*40)
72         d.addCallback(lambda res: u.close())
73         return d
74
75     def test_data(self):
76         s = "a"*41
77         u = upload.Data(s, convergence=None)
78         d = u.get_size()
79         d.addCallback(self.failUnlessEqual, 41)
80         d.addCallback(lambda res: u.read(1))
81         d.addCallback(self.shouldEqual, "a")
82         d.addCallback(lambda res: u.read(80))
83         d.addCallback(self.shouldEqual, "a"*40)
84         d.addCallback(lambda res: u.close())
85         return d
86
87 class ServerError(Exception):
88     pass
89
90 class SetDEPMixin:
91     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
92         p = {"k": k,
93              "happy": happy,
94              "n": n,
95              "max_segment_size": max_segsize,
96              }
97         self.node.DEFAULT_ENCODING_PARAMETERS = p
98
99 class FakeStorageServer:
100     def __init__(self, mode):
101         self.mode = mode
102         self.allocated = []
103         self.queries = 0
104         self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
105                          { "maximum-immutable-share-size": 2**32 },
106                          "application-version": str(allmydata.__full_version__),
107                          }
108         if mode == "small":
109             self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
110                              { "maximum-immutable-share-size": 10 },
111                              "application-version": str(allmydata.__full_version__),
112                              }
113
114
115     def callRemote(self, methname, *args, **kwargs):
116         def _call():
117             meth = getattr(self, methname)
118             return meth(*args, **kwargs)
119         d = fireEventually()
120         d.addCallback(lambda res: _call())
121         return d
122
123     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
124                          sharenums, share_size, canary):
125         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
126         if self.mode == "first-fail":
127             if self.queries == 0:
128                 raise ServerError
129         if self.mode == "second-fail":
130             if self.queries == 1:
131                 raise ServerError
132         self.queries += 1
133         if self.mode == "full":
134             return (set(), {},)
135         elif self.mode == "already got them":
136             return (set(sharenums), {},)
137         else:
138             for shnum in sharenums:
139                 self.allocated.append( (storage_index, shnum) )
140             return (set(),
141                     dict([( shnum, FakeBucketWriter(share_size) )
142                           for shnum in sharenums]),
143                     )
144
145 class FakeBucketWriter:
146     # a diagnostic version of storageserver.BucketWriter
147     def __init__(self, size):
148         self.data = StringIO()
149         self.closed = False
150         self._size = size
151
152     def callRemote(self, methname, *args, **kwargs):
153         def _call():
154             meth = getattr(self, "remote_" + methname)
155             return meth(*args, **kwargs)
156         d = fireEventually()
157         d.addCallback(lambda res: _call())
158         return d
159
160     def remote_write(self, offset, data):
161         precondition(not self.closed)
162         precondition(offset >= 0)
163         precondition(offset+len(data) <= self._size,
164                      "offset=%d + data=%d > size=%d" %
165                      (offset, len(data), self._size))
166         self.data.seek(offset)
167         self.data.write(data)
168
169     def remote_close(self):
170         precondition(not self.closed)
171         self.closed = True
172
173     def remote_abort(self):
174         log.err(RuntimeError("uh oh, I was asked to abort"))
175
176 class FakeClient:
177     DEFAULT_ENCODING_PARAMETERS = {"k":25,
178                                    "happy": 75,
179                                    "n": 100,
180                                    "max_segment_size": 1*MiB,
181                                    }
182     def __init__(self, mode="good", num_servers=50):
183         self.num_servers = num_servers
184         if type(mode) is str:
185             mode = dict([i,mode] for i in range(num_servers))
186         peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
187                   for fakeid in range(self.num_servers) ]
188         self.storage_broker = StorageFarmBroker(None, permute_peers=True)
189         for (serverid, server) in peers:
190             self.storage_broker.test_add_server(serverid, server)
191         self.last_peers = [p[1] for p in peers]
192
193     def log(self, *args, **kwargs):
194         pass
195     def get_encoding_parameters(self):
196         return self.DEFAULT_ENCODING_PARAMETERS
197     def get_storage_broker(self):
198         return self.storage_broker
199     _secret_holder = client.SecretHolder("lease secret", "convergence secret")
200
201 class GotTooFarError(Exception):
202     pass
203
204 class GiganticUploadable(upload.FileHandle):
205     def __init__(self, size):
206         self._size = size
207         self._fp = 0
208
209     def get_encryption_key(self):
210         return defer.succeed("\x00" * 16)
211     def get_size(self):
212         return defer.succeed(self._size)
213     def read(self, length):
214         left = self._size - self._fp
215         length = min(left, length)
216         self._fp += length
217         if self._fp > 1000000:
218             # terminate the test early.
219             raise GotTooFarError("we shouldn't be allowed to get this far")
220         return defer.succeed(["\x00" * length])
221     def close(self):
222         pass
223
224 DATA = """
225 Once upon a time, there was a beautiful princess named Buttercup. She lived
226 in a magical land where every file was stored securely among millions of
227 machines, and nobody ever worried about their data being lost ever again.
228 The End.
229 """
230 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
231
232 SIZE_ZERO = 0
233 SIZE_SMALL = 16
234 SIZE_LARGE = len(DATA)
235
236 def upload_data(uploader, data):
237     u = upload.Data(data, convergence=None)
238     return uploader.upload(u)
239 def upload_filename(uploader, filename):
240     u = upload.FileName(filename, convergence=None)
241     return uploader.upload(u)
242 def upload_filehandle(uploader, fh):
243     u = upload.FileHandle(fh, convergence=None)
244     return uploader.upload(u)
245
246 class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
247     def setUp(self):
248         self.node = FakeClient(mode="good")
249         self.u = upload.Uploader()
250         self.u.running = True
251         self.u.parent = self.node
252
253     def _check_small(self, newuri, size):
254         u = uri.from_string(newuri)
255         self.failUnless(isinstance(u, uri.LiteralFileURI))
256         self.failUnlessEqual(len(u.data), size)
257
258     def _check_large(self, newuri, size):
259         u = uri.from_string(newuri)
260         self.failUnless(isinstance(u, uri.CHKFileURI))
261         self.failUnless(isinstance(u.get_storage_index(), str))
262         self.failUnlessEqual(len(u.get_storage_index()), 16)
263         self.failUnless(isinstance(u.key, str))
264         self.failUnlessEqual(len(u.key), 16)
265         self.failUnlessEqual(u.size, size)
266
267     def get_data(self, size):
268         return DATA[:size]
269
270     def test_too_large(self):
271         # we've removed the 4GiB share size limit (see ticket #346 for
272         # details), but still have an 8-byte field, so the limit is now
273         # 2**64, so make sure we reject files larger than that.
274         k = 3; happy = 7; n = 10
275         self.set_encoding_parameters(k, happy, n)
276         big = k*(2**64)
277         data1 = GiganticUploadable(big)
278         d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
279                             "This file is too large to be uploaded (data_size)",
280                             self.u.upload, data1)
281         data2 = GiganticUploadable(big-3)
282         d.addCallback(lambda res:
283                       self.shouldFail(FileTooLargeError,
284                                       "test_too_large-data2",
285                                       "This file is too large to be uploaded (offsets)",
286                                       self.u.upload, data2))
287         # I don't know where the actual limit is.. it depends upon how large
288         # the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
289         return d
290
291     def test_data_zero(self):
292         data = self.get_data(SIZE_ZERO)
293         d = upload_data(self.u, data)
294         d.addCallback(extract_uri)
295         d.addCallback(self._check_small, SIZE_ZERO)
296         return d
297
298     def test_data_small(self):
299         data = self.get_data(SIZE_SMALL)
300         d = upload_data(self.u, data)
301         d.addCallback(extract_uri)
302         d.addCallback(self._check_small, SIZE_SMALL)
303         return d
304
305     def test_data_large(self):
306         data = self.get_data(SIZE_LARGE)
307         d = upload_data(self.u, data)
308         d.addCallback(extract_uri)
309         d.addCallback(self._check_large, SIZE_LARGE)
310         return d
311
312     def test_data_large_odd_segments(self):
313         data = self.get_data(SIZE_LARGE)
314         segsize = int(SIZE_LARGE / 2.5)
315         # we want 3 segments, since that's not a power of two
316         self.set_encoding_parameters(25, 75, 100, segsize)
317         d = upload_data(self.u, data)
318         d.addCallback(extract_uri)
319         d.addCallback(self._check_large, SIZE_LARGE)
320         return d
321
322     def test_filehandle_zero(self):
323         data = self.get_data(SIZE_ZERO)
324         d = upload_filehandle(self.u, StringIO(data))
325         d.addCallback(extract_uri)
326         d.addCallback(self._check_small, SIZE_ZERO)
327         return d
328
329     def test_filehandle_small(self):
330         data = self.get_data(SIZE_SMALL)
331         d = upload_filehandle(self.u, StringIO(data))
332         d.addCallback(extract_uri)
333         d.addCallback(self._check_small, SIZE_SMALL)
334         return d
335
336     def test_filehandle_large(self):
337         data = self.get_data(SIZE_LARGE)
338         d = upload_filehandle(self.u, StringIO(data))
339         d.addCallback(extract_uri)
340         d.addCallback(self._check_large, SIZE_LARGE)
341         return d
342
343     def test_filename_zero(self):
344         fn = "Uploader-test_filename_zero.data"
345         f = open(fn, "wb")
346         data = self.get_data(SIZE_ZERO)
347         f.write(data)
348         f.close()
349         d = upload_filename(self.u, fn)
350         d.addCallback(extract_uri)
351         d.addCallback(self._check_small, SIZE_ZERO)
352         return d
353
354     def test_filename_small(self):
355         fn = "Uploader-test_filename_small.data"
356         f = open(fn, "wb")
357         data = self.get_data(SIZE_SMALL)
358         f.write(data)
359         f.close()
360         d = upload_filename(self.u, fn)
361         d.addCallback(extract_uri)
362         d.addCallback(self._check_small, SIZE_SMALL)
363         return d
364
365     def test_filename_large(self):
366         fn = "Uploader-test_filename_large.data"
367         f = open(fn, "wb")
368         data = self.get_data(SIZE_LARGE)
369         f.write(data)
370         f.close()
371         d = upload_filename(self.u, fn)
372         d.addCallback(extract_uri)
373         d.addCallback(self._check_large, SIZE_LARGE)
374         return d
375
376 class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
377     def make_node(self, mode, num_servers=10):
378         self.node = FakeClient(mode, num_servers)
379         self.u = upload.Uploader()
380         self.u.running = True
381         self.u.parent = self.node
382
383     def _check_large(self, newuri, size):
384         u = uri.from_string(newuri)
385         self.failUnless(isinstance(u, uri.CHKFileURI))
386         self.failUnless(isinstance(u.get_storage_index(), str))
387         self.failUnlessEqual(len(u.get_storage_index()), 16)
388         self.failUnless(isinstance(u.key, str))
389         self.failUnlessEqual(len(u.key), 16)
390         self.failUnlessEqual(u.size, size)
391
392     def test_first_error(self):
393         mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
394         self.make_node(mode)
395         d = upload_data(self.u, DATA)
396         d.addCallback(extract_uri)
397         d.addCallback(self._check_large, SIZE_LARGE)
398         return d
399
400     def test_first_error_all(self):
401         self.make_node("first-fail")
402         d = self.shouldFail(UploadUnhappinessError, "first_error_all",
403                             "peer selection failed",
404                             upload_data, self.u, DATA)
405         def _check((f,)):
406             self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
407             # there should also be a 'last failure was' message
408             self.failUnlessIn("ServerError", str(f.value))
409         d.addCallback(_check)
410         return d
411
412     def test_second_error(self):
413         # we want to make sure we make it to a third pass. This means that
414         # the first pass was insufficient to place all shares, and at least
415         # one of second pass servers (other than the last one) accepted a
416         # share (so we'll believe that a third pass will be useful). (if
417         # everyone but the last server throws an error, then we'll send all
418         # the remaining shares to the last server at the end of the second
419         # pass, and if that succeeds, we won't make it to a third pass).
420         #
421         # we can achieve this 97.5% of the time by using 40 servers, having
422         # 39 of them fail on the second request, leaving only one to succeed
423         # on the second request. (we need to keep the number of servers low
424         # enough to ensure a second pass with 100 shares).
425         mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
426         self.make_node(mode, 40)
427         d = upload_data(self.u, DATA)
428         d.addCallback(extract_uri)
429         d.addCallback(self._check_large, SIZE_LARGE)
430         return d
431
432     def test_second_error_all(self):
433         self.make_node("second-fail")
434         d = self.shouldFail(UploadUnhappinessError, "second_error_all",
435                             "peer selection failed",
436                             upload_data, self.u, DATA)
437         def _check((f,)):
438             self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
439             # there should also be a 'last failure was' message
440             self.failUnlessIn("ServerError", str(f.value))
441         d.addCallback(_check)
442         return d
443
444 class FullServer(unittest.TestCase):
445     def setUp(self):
446         self.node = FakeClient(mode="full")
447         self.u = upload.Uploader()
448         self.u.running = True
449         self.u.parent = self.node
450
451     def _should_fail(self, f):
452         self.failUnless(isinstance(f, Failure) and f.check(UploadUnhappinessError), f)
453
454     def test_data_large(self):
455         data = DATA
456         d = upload_data(self.u, data)
457         d.addBoth(self._should_fail)
458         return d
459
460 class PeerSelection(unittest.TestCase):
461
462     def make_client(self, num_servers=50):
463         self.node = FakeClient(mode="good", num_servers=num_servers)
464         self.u = upload.Uploader()
465         self.u.running = True
466         self.u.parent = self.node
467
468     def get_data(self, size):
469         return DATA[:size]
470
471     def _check_large(self, newuri, size):
472         u = uri.from_string(newuri)
473         self.failUnless(isinstance(u, uri.CHKFileURI))
474         self.failUnless(isinstance(u.get_storage_index(), str))
475         self.failUnlessEqual(len(u.get_storage_index()), 16)
476         self.failUnless(isinstance(u.key, str))
477         self.failUnlessEqual(len(u.key), 16)
478         self.failUnlessEqual(u.size, size)
479
480     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
481         p = {"k": k,
482              "happy": happy,
483              "n": n,
484              "max_segment_size": max_segsize,
485              }
486         self.node.DEFAULT_ENCODING_PARAMETERS = p
487
488     def test_one_each(self):
489         # if we have 50 shares, and there are 50 peers, and they all accept a
490         # share, we should get exactly one share per peer
491
492         self.make_client()
493         data = self.get_data(SIZE_LARGE)
494         self.set_encoding_parameters(25, 30, 50)
495         d = upload_data(self.u, data)
496         d.addCallback(extract_uri)
497         d.addCallback(self._check_large, SIZE_LARGE)
498         def _check(res):
499             for p in self.node.last_peers:
500                 allocated = p.allocated
501                 self.failUnlessEqual(len(allocated), 1)
502                 self.failUnlessEqual(p.queries, 1)
503         d.addCallback(_check)
504         return d
505
506     def test_two_each(self):
507         # if we have 100 shares, and there are 50 peers, and they all accept
508         # all shares, we should get exactly two shares per peer
509
510         self.make_client()
511         data = self.get_data(SIZE_LARGE)
512         self.set_encoding_parameters(50, 75, 100)
513         d = upload_data(self.u, data)
514         d.addCallback(extract_uri)
515         d.addCallback(self._check_large, SIZE_LARGE)
516         def _check(res):
517             for p in self.node.last_peers:
518                 allocated = p.allocated
519                 self.failUnlessEqual(len(allocated), 2)
520                 self.failUnlessEqual(p.queries, 2)
521         d.addCallback(_check)
522         return d
523
524     def test_one_each_plus_one_extra(self):
525         # if we have 51 shares, and there are 50 peers, then one peer gets
526         # two shares and the rest get just one
527
528         self.make_client()
529         data = self.get_data(SIZE_LARGE)
530         self.set_encoding_parameters(24, 41, 51)
531         d = upload_data(self.u, data)
532         d.addCallback(extract_uri)
533         d.addCallback(self._check_large, SIZE_LARGE)
534         def _check(res):
535             got_one = []
536             got_two = []
537             for p in self.node.last_peers:
538                 allocated = p.allocated
539                 self.failUnless(len(allocated) in (1,2), len(allocated))
540                 if len(allocated) == 1:
541                     self.failUnlessEqual(p.queries, 1)
542                     got_one.append(p)
543                 else:
544                     self.failUnlessEqual(p.queries, 2)
545                     got_two.append(p)
546             self.failUnlessEqual(len(got_one), 49)
547             self.failUnlessEqual(len(got_two), 1)
548         d.addCallback(_check)
549         return d
550
551     def test_four_each(self):
552         # if we have 200 shares, and there are 50 peers, then each peer gets
553         # 4 shares. The design goal is to accomplish this with only two
554         # queries per peer.
555
556         self.make_client()
557         data = self.get_data(SIZE_LARGE)
558         self.set_encoding_parameters(100, 150, 200)
559         d = upload_data(self.u, data)
560         d.addCallback(extract_uri)
561         d.addCallback(self._check_large, SIZE_LARGE)
562         def _check(res):
563             for p in self.node.last_peers:
564                 allocated = p.allocated
565                 self.failUnlessEqual(len(allocated), 4)
566                 self.failUnlessEqual(p.queries, 2)
567         d.addCallback(_check)
568         return d
569
570     def test_three_of_ten(self):
571         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
572         # 4+4+2
573
574         self.make_client(3)
575         data = self.get_data(SIZE_LARGE)
576         self.set_encoding_parameters(3, 5, 10)
577         d = upload_data(self.u, data)
578         d.addCallback(extract_uri)
579         d.addCallback(self._check_large, SIZE_LARGE)
580         def _check(res):
581             counts = {}
582             for p in self.node.last_peers:
583                 allocated = p.allocated
584                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
585             histogram = [counts.get(i, 0) for i in range(5)]
586             self.failUnlessEqual(histogram, [0,0,0,2,1])
587         d.addCallback(_check)
588         return d
589
590     def test_some_big_some_small(self):
591         # 10 shares, 20 servers, but half the servers don't support a
592         # share-size large enough for our file
593         mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
594         self.node = FakeClient(mode, num_servers=20)
595         self.u = upload.Uploader()
596         self.u.running = True
597         self.u.parent = self.node
598
599         data = self.get_data(SIZE_LARGE)
600         self.set_encoding_parameters(3, 5, 10)
601         d = upload_data(self.u, data)
602         d.addCallback(extract_uri)
603         d.addCallback(self._check_large, SIZE_LARGE)
604         def _check(res):
605             # we should have put one share each on the big peers, and zero
606             # shares on the small peers
607             total_allocated = 0
608             for p in self.node.last_peers:
609                 if p.mode == "good":
610                     self.failUnlessEqual(len(p.allocated), 1)
611                 elif p.mode == "small":
612                     self.failUnlessEqual(len(p.allocated), 0)
613                 total_allocated += len(p.allocated)
614             self.failUnlessEqual(total_allocated, 10)
615         d.addCallback(_check)
616         return d
617
618
619 class StorageIndex(unittest.TestCase):
620     def test_params_must_matter(self):
621         DATA = "I am some data"
622         u = upload.Data(DATA, convergence="")
623         eu = upload.EncryptAnUploadable(u)
624         d1 = eu.get_storage_index()
625
626         # CHK means the same data should encrypt the same way
627         u = upload.Data(DATA, convergence="")
628         eu = upload.EncryptAnUploadable(u)
629         d1a = eu.get_storage_index()
630
631         # but if we use a different convergence string it should be different
632         u = upload.Data(DATA, convergence="wheee!")
633         eu = upload.EncryptAnUploadable(u)
634         d1salt1 = eu.get_storage_index()
635
636         # and if we add yet a different convergence it should be different again
637         u = upload.Data(DATA, convergence="NOT wheee!")
638         eu = upload.EncryptAnUploadable(u)
639         d1salt2 = eu.get_storage_index()
640
641         # and if we use the first string again it should be the same as last time
642         u = upload.Data(DATA, convergence="wheee!")
643         eu = upload.EncryptAnUploadable(u)
644         d1salt1a = eu.get_storage_index()
645
646         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
647         u = upload.Data(DATA, convergence="")
648         u.encoding_param_k = u.default_encoding_param_k + 1
649         eu = upload.EncryptAnUploadable(u)
650         d2 = eu.get_storage_index()
651
652         # and if we use a random key, it should be different than the CHK
653         u = upload.Data(DATA, convergence=None)
654         eu = upload.EncryptAnUploadable(u)
655         d3 = eu.get_storage_index()
656         # and different from another instance
657         u = upload.Data(DATA, convergence=None)
658         eu = upload.EncryptAnUploadable(u)
659         d4 = eu.get_storage_index()
660
661         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
662         def _done(res):
663             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
664             self.failUnlessEqual(si1, si1a)
665             self.failIfEqual(si1, si2)
666             self.failIfEqual(si1, si3)
667             self.failIfEqual(si1, si4)
668             self.failIfEqual(si3, si4)
669             self.failIfEqual(si1salt1, si1)
670             self.failIfEqual(si1salt1, si1salt2)
671             self.failIfEqual(si1salt2, si1)
672             self.failUnlessEqual(si1salt1, si1salt1a)
673         d.addCallback(_done)
674         return d
675
676 class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
677     ShouldFailMixin):
678     def _do_upload_with_broken_servers(self, servers_to_break):
679         """
680         I act like a normal upload, but before I send the results of
681         Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
682         PeerTrackers in the used_peers part of the return result.
683         """
684         assert self.g, "I tried to find a grid at self.g, but failed"
685         broker = self.g.clients[0].storage_broker
686         sh     = self.g.clients[0]._secret_holder
687         data = upload.Data("data" * 10000, convergence="")
688         data.encoding_param_k = 3
689         data.encoding_param_happy = 4
690         data.encoding_param_n = 10
691         uploadable = upload.EncryptAnUploadable(data)
692         encoder = encode.Encoder()
693         encoder.set_encrypted_uploadable(uploadable)
694         status = upload.UploadStatus()
695         selector = upload.Tahoe2PeerSelector("dglev", "test", status)
696         storage_index = encoder.get_param("storage_index")
697         share_size = encoder.get_param("share_size")
698         block_size = encoder.get_param("block_size")
699         num_segments = encoder.get_param("num_segments")
700         d = selector.get_shareholders(broker, sh, storage_index,
701                                       share_size, block_size, num_segments,
702                                       10, 4)
703         def _have_shareholders((used_peers, already_peers)):
704             assert servers_to_break <= len(used_peers)
705             for index in xrange(servers_to_break):
706                 server = list(used_peers)[index]
707                 for share in server.buckets.keys():
708                     server.buckets[share].abort()
709             buckets = {}
710             servermap = already_peers.copy()
711             for peer in used_peers:
712                 buckets.update(peer.buckets)
713                 for bucket in peer.buckets:
714                     servermap[bucket] = peer.peerid
715             encoder.set_shareholders(buckets, servermap)
716             d = encoder.start()
717             return d
718         d.addCallback(_have_shareholders)
719         return d
720
721
722     def _add_server(self, server_number, readonly=False):
723         assert self.g, "I tried to find a grid at self.g, but failed"
724         assert self.shares, "I tried to find shares at self.shares, but failed"
725         ss = self.g.make_server(server_number, readonly)
726         self.g.add_server(server_number, ss)
727
728
729     def _add_server_with_share(self, server_number, share_number=None,
730                                readonly=False):
731         self._add_server(server_number, readonly)
732         if share_number is not None:
733             self._copy_share_to_server(share_number, server_number)
734
735
736     def _copy_share_to_server(self, share_number, server_number):
737         ss = self.g.servers_by_number[server_number]
738         # Copy share i from the directory associated with the first 
739         # storage server to the directory associated with this one.
740         assert self.g, "I tried to find a grid at self.g, but failed"
741         assert self.shares, "I tried to find shares at self.shares, but failed"
742         old_share_location = self.shares[share_number][2]
743         new_share_location = os.path.join(ss.storedir, "shares")
744         si = uri.from_string(self.uri).get_storage_index()
745         new_share_location = os.path.join(new_share_location,
746                                           storage_index_to_dir(si))
747         if not os.path.exists(new_share_location):
748             os.makedirs(new_share_location)
749         new_share_location = os.path.join(new_share_location,
750                                           str(share_number))
751         if old_share_location != new_share_location:
752             shutil.copy(old_share_location, new_share_location)
753         shares = self.find_shares(self.uri)
754         # Make sure that the storage server has the share.
755         self.failUnless((share_number, ss.my_nodeid, new_share_location)
756                         in shares)
757
758
759     def _setup_and_upload(self):
760         """
761         I set up a NoNetworkGrid with a single server and client,
762         upload a file to it, store its uri in self.uri, and store its
763         sharedata in self.shares.
764         """
765         self.set_up_grid(num_clients=1, num_servers=1)
766         client = self.g.clients[0]
767         client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
768         data = upload.Data("data" * 10000, convergence="")
769         self.data = data
770         d = client.upload(data)
771         def _store_uri(ur):
772             self.uri = ur.uri
773         d.addCallback(_store_uri)
774         d.addCallback(lambda ign:
775             self.find_shares(self.uri))
776         def _store_shares(shares):
777             self.shares = shares
778         d.addCallback(_store_shares)
779         return d
780
781
782     def test_configure_parameters(self):
783         self.basedir = self.mktemp()
784         hooks = {0: self._set_up_nodes_extra_config}
785         self.set_up_grid(client_config_hooks=hooks)
786         c0 = self.g.clients[0]
787
788         DATA = "data" * 100
789         u = upload.Data(DATA, convergence="")
790         d = c0.upload(u)
791         d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
792         m = monitor.Monitor()
793         d.addCallback(lambda fn: fn.check(m))
794         def _check(cr):
795             data = cr.get_data()
796             self.failUnlessEqual(data["count-shares-needed"], 7)
797             self.failUnlessEqual(data["count-shares-expected"], 12)
798         d.addCallback(_check)
799         return d
800
801
802     def _setUp(self, ns):
803         # Used by test_happy_semantics and test_prexisting_share_behavior
804         # to set up the grid.
805         self.node = FakeClient(mode="good", num_servers=ns)
806         self.u = upload.Uploader()
807         self.u.running = True
808         self.u.parent = self.node
809
810
811     def test_happy_semantics(self):
812         self._setUp(2)
813         DATA = upload.Data("kittens" * 10000, convergence="")
814         # These parameters are unsatisfiable with the client that we've made
815         # -- we'll use them to test that the semnatics work correctly.
816         self.set_encoding_parameters(k=3, happy=5, n=10)
817         d = self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
818                             "shares could only be placed on 2 servers "
819                             "(5 were requested)",
820                             self.u.upload, DATA)
821         # Let's reset the client to have 10 servers
822         d.addCallback(lambda ign:
823             self._setUp(10))
824         # These parameters are satisfiable with the client we've made.
825         d.addCallback(lambda ign:
826             self.set_encoding_parameters(k=3, happy=5, n=10))
827         # this should work
828         d.addCallback(lambda ign:
829             self.u.upload(DATA))
830         # Let's reset the client to have 7 servers
831         # (this is less than n, but more than h)
832         d.addCallback(lambda ign:
833             self._setUp(7))
834         # These encoding parameters should still be satisfiable with our 
835         # client setup
836         d.addCallback(lambda ign:
837             self.set_encoding_parameters(k=3, happy=5, n=10))
838         # This, then, should work.
839         d.addCallback(lambda ign:
840             self.u.upload(DATA))
841         return d
842
843
844     def test_problem_layout_comment_52(self):
845         def _basedir():
846             self.basedir = self.mktemp()
847         _basedir()
848         # This scenario is at 
849         # http://allmydata.org/trac/tahoe/ticket/778#comment:52
850         #
851         # The scenario in comment:52 proposes that we have a layout
852         # like:
853         # server 1: share 1
854         # server 2: share 1
855         # server 3: share 1
856         # server 4: shares 2 - 10
857         # To get access to the shares, we will first upload to one 
858         # server, which will then have shares 1 - 10. We'll then 
859         # add three new servers, configure them to not accept any new
860         # shares, then write share 1 directly into the serverdir of each.
861         # Then each of servers 1 - 3 will report that they have share 1, 
862         # and will not accept any new share, while server 4 will report that
863         # it has shares 2 - 10 and will accept new shares.
864         # We'll then set 'happy' = 4, and see that an upload fails
865         # (as it should)
866         d = self._setup_and_upload()
867         d.addCallback(lambda ign:
868             self._add_server_with_share(1, 0, True))
869         d.addCallback(lambda ign:
870             self._add_server_with_share(2, 0, True))
871         d.addCallback(lambda ign:
872             self._add_server_with_share(3, 0, True))
873         # Remove the first share from server 0.
874         def _remove_share_0():
875             share_location = self.shares[0][2]
876             os.remove(share_location)
877         d.addCallback(lambda ign:
878             _remove_share_0())
879         # Set happy = 4 in the client.
880         def _prepare():
881             client = self.g.clients[0]
882             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
883             return client
884         d.addCallback(lambda ign:
885             _prepare())
886         # Uploading data should fail
887         d.addCallback(lambda client:
888             self.shouldFail(UploadUnhappinessError, "test_happy_semantics",
889                             "shares could only be placed on 2 servers "
890                             "(4 were requested)",
891                             client.upload, upload.Data("data" * 10000,
892                                                        convergence="")))
893
894         # Do comment:52, but like this:
895         # server 2: empty
896         # server 3: share 0, read-only
897         # server 1: share 0, read-only
898         # server 0: shares 0-9
899         d.addCallback(lambda ign:
900             _basedir())
901         d.addCallback(lambda ign:
902             self._setup_and_upload())
903         d.addCallback(lambda ign:
904             self._add_server_with_share(server_number=2))
905         d.addCallback(lambda ign:
906             self._add_server_with_share(server_number=3, share_number=0,
907                                         readonly=True))
908         d.addCallback(lambda ign:
909             self._add_server_with_share(server_number=1, share_number=0,
910                                         readonly=True))
911         def _prepare2():
912             client = self.g.clients[0]
913             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
914             return client
915         d.addCallback(lambda ign:
916             _prepare2())
917         d.addCallback(lambda client:
918             self.shouldFail(UploadUnhappinessError, "test_happy_sematics",
919                             "shares could only be placed on 2 servers "
920                             "(3 were requested)",
921                             client.upload, upload.Data("data" * 10000,
922                                                        convergence="")))
923         return d
924
925
926     def test_problem_layout_comment_53(self):
927         # This scenario is at
928         # http://allmydata.org/trac/tahoe/ticket/778#comment:53
929         #
930         # Set up the grid to have one server
931         def _change_basedir(ign):
932             self.basedir = self.mktemp()
933         _change_basedir(None)
934         d = self._setup_and_upload()
935         # We start by uploading all of the shares to one server (which has 
936         # already been done above).
937         # Next, we'll add three new servers to our NoNetworkGrid. We'll add
938         # one share from our initial upload to each of these.
939         # The counterintuitive ordering of the share numbers is to deal with 
940         # the permuting of these servers -- distributing the shares this 
941         # way ensures that the Tahoe2PeerSelector sees them in the order 
942         # described above.
943         d.addCallback(lambda ign:
944             self._add_server_with_share(server_number=1, share_number=2))
945         d.addCallback(lambda ign:
946             self._add_server_with_share(server_number=2, share_number=0))
947         d.addCallback(lambda ign:
948             self._add_server_with_share(server_number=3, share_number=1))
949         # So, we now have the following layout:
950         # server 0: shares 0 - 9
951         # server 1: share 2
952         # server 2: share 0
953         # server 3: share 1
954         # We want to change the 'happy' parameter in the client to 4. 
955         # The Tahoe2PeerSelector will see the peers permuted as:
956         # 2, 3, 1, 0
957         # Ideally, a reupload of our original data should work.
958         def _reset_encoding_parameters(ign, happy=4):
959             client = self.g.clients[0]
960             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
961             return client
962         d.addCallback(_reset_encoding_parameters)
963         d.addCallback(lambda client:
964             client.upload(upload.Data("data" * 10000, convergence="")))
965
966
967         # This scenario is basically comment:53, but with the order reversed;
968         # this means that the Tahoe2PeerSelector sees
969         # server 2: shares 1-10
970         # server 3: share 1
971         # server 1: share 2
972         # server 4: share 3
973         d.addCallback(_change_basedir)
974         d.addCallback(lambda ign:
975             self._setup_and_upload())
976         d.addCallback(lambda ign:
977             self._add_server_with_share(server_number=2, share_number=0))
978         d.addCallback(lambda ign:
979             self._add_server_with_share(server_number=3, share_number=1))
980         d.addCallback(lambda ign:
981             self._add_server_with_share(server_number=1, share_number=2))
982         # Copy all of the other shares to server number 2
983         def _copy_shares(ign):
984             for i in xrange(1, 10):
985                 self._copy_share_to_server(i, 2)
986         d.addCallback(_copy_shares)
987         # Remove the first server, and add a placeholder with share 0
988         d.addCallback(lambda ign:
989             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
990         d.addCallback(lambda ign:
991             self._add_server_with_share(server_number=4, share_number=0))
992         # Now try uploading. 
993         d.addCallback(_reset_encoding_parameters)
994         d.addCallback(lambda client:
995             client.upload(upload.Data("data" * 10000, convergence="")))
996         # Try the same thing, but with empty servers after the first one
997         # We want to make sure that Tahoe2PeerSelector will redistribute
998         # shares as necessary, not simply discover an existing layout.
999         d.addCallback(_change_basedir)
1000         d.addCallback(lambda ign:
1001             self._setup_and_upload())
1002         d.addCallback(lambda ign:
1003             self._add_server(server_number=2))
1004         d.addCallback(lambda ign:
1005             self._add_server(server_number=3))
1006         d.addCallback(lambda ign:
1007             self._add_server(server_number=1))
1008         d.addCallback(_copy_shares)
1009         d.addCallback(lambda ign:
1010             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1011         d.addCallback(lambda ign:
1012             self._add_server(server_number=4))
1013         d.addCallback(_reset_encoding_parameters)
1014         d.addCallback(lambda client:
1015             client.upload(upload.Data("data" * 10000, convergence="")))
1016         return d
1017
1018
1019     def test_happiness_with_some_readonly_peers(self):
1020         # Try the following layout
1021         # server 2: shares 0-9
1022         # server 4: share 0, read-only
1023         # server 3: share 1, read-only
1024         # server 1: share 2, read-only
1025         self.basedir = self.mktemp()
1026         d = self._setup_and_upload()
1027         d.addCallback(lambda ign:
1028             self._add_server_with_share(server_number=2, share_number=0))
1029         d.addCallback(lambda ign:
1030             self._add_server_with_share(server_number=3, share_number=1,
1031                                         readonly=True))
1032         d.addCallback(lambda ign:
1033             self._add_server_with_share(server_number=1, share_number=2,
1034                                         readonly=True))
1035         # Copy all of the other shares to server number 2
1036         def _copy_shares(ign):
1037             for i in xrange(1, 10):
1038                 self._copy_share_to_server(i, 2)
1039         d.addCallback(_copy_shares)
1040         # Remove server 0, and add another in its place
1041         d.addCallback(lambda ign:
1042             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1043         d.addCallback(lambda ign:
1044             self._add_server_with_share(server_number=4, share_number=0,
1045                                         readonly=True))
1046         def _reset_encoding_parameters(ign, happy=4):
1047             client = self.g.clients[0]
1048             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1049             return client
1050         d.addCallback(_reset_encoding_parameters)
1051         d.addCallback(lambda client:
1052             client.upload(upload.Data("data" * 10000, convergence="")))
1053         return d
1054
1055
1056     def test_happiness_with_all_readonly_peers(self):
1057         # server 3: share 1, read-only
1058         # server 1: share 2, read-only
1059         # server 2: shares 0-9, read-only
1060         # server 4: share 0, read-only
1061         # The idea with this test is to make sure that the survey of
1062         # read-only peers doesn't undercount servers of happiness
1063         self.basedir = self.mktemp()
1064         d = self._setup_and_upload()
1065         d.addCallback(lambda ign:
1066             self._add_server_with_share(server_number=4, share_number=0,
1067                                         readonly=True))
1068         d.addCallback(lambda ign:
1069             self._add_server_with_share(server_number=3, share_number=1,
1070                                         readonly=True))
1071         d.addCallback(lambda ign:
1072             self._add_server_with_share(server_number=1, share_number=2,
1073                                         readonly=True))
1074         d.addCallback(lambda ign:
1075             self._add_server_with_share(server_number=2, share_number=0,
1076                                         readonly=True))
1077         def _copy_shares(ign):
1078             for i in xrange(1, 10):
1079                 self._copy_share_to_server(i, 2)
1080         d.addCallback(_copy_shares)
1081         d.addCallback(lambda ign:
1082             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1083         def _reset_encoding_parameters(ign, happy=4):
1084             client = self.g.clients[0]
1085             client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
1086             return client
1087         d.addCallback(_reset_encoding_parameters)
1088         d.addCallback(lambda client:
1089             client.upload(upload.Data("data" * 10000, convergence="")))
1090         return d
1091
1092
1093     def test_dropped_servers_in_encoder(self):
1094         def _set_basedir(ign=None):
1095             self.basedir = self.mktemp()
1096         _set_basedir()
1097         d = self._setup_and_upload();
1098         # Add 5 servers
1099         def _do_server_setup(ign):
1100             self._add_server_with_share(1)
1101             self._add_server_with_share(2)
1102             self._add_server_with_share(3)
1103             self._add_server_with_share(4)
1104             self._add_server_with_share(5)
1105         d.addCallback(_do_server_setup)
1106         # remove the original server
1107         # (necessary to ensure that the Tahoe2PeerSelector will distribute
1108         #  all the shares)
1109         def _remove_server(ign):
1110             server = self.g.servers_by_number[0]
1111             self.g.remove_server(server.my_nodeid)
1112         d.addCallback(_remove_server)
1113         # This should succeed.
1114         d.addCallback(lambda ign:
1115             self._do_upload_with_broken_servers(1))
1116         # Now, do the same thing over again, but drop 2 servers instead
1117         # of 1. This should fail.
1118         d.addCallback(_set_basedir)
1119         d.addCallback(lambda ign:
1120             self._setup_and_upload())
1121         d.addCallback(_do_server_setup)
1122         d.addCallback(_remove_server)
1123         d.addCallback(lambda ign:
1124             self.shouldFail(UploadUnhappinessError,
1125                             "test_dropped_servers_in_encoder",
1126                             "lost too many servers during upload "
1127                             "(still have 3, want 4)",
1128                             self._do_upload_with_broken_servers, 2))
1129         # Now do the same thing over again, but make some of the servers
1130         # readonly, break some of the ones that aren't, and make sure that
1131         # happiness accounting is preserved.
1132         d.addCallback(_set_basedir)
1133         d.addCallback(lambda ign:
1134             self._setup_and_upload())
1135         def _do_server_setup_2(ign):
1136             self._add_server_with_share(1)
1137             self._add_server_with_share(2)
1138             self._add_server_with_share(3)
1139             self._add_server_with_share(4, 7, readonly=True)
1140             self._add_server_with_share(5, 8, readonly=True)
1141         d.addCallback(_do_server_setup_2)
1142         d.addCallback(_remove_server)
1143         d.addCallback(lambda ign:
1144             self._do_upload_with_broken_servers(1))
1145         d.addCallback(_set_basedir)
1146         d.addCallback(lambda ign:
1147             self._setup_and_upload())
1148         d.addCallback(_do_server_setup_2)
1149         d.addCallback(_remove_server)
1150         d.addCallback(lambda ign:
1151             self.shouldFail(UploadUnhappinessError,
1152                             "test_dropped_servers_in_encoder",
1153                             "lost too many servers during upload "
1154                             "(still have 3, want 4)",
1155                             self._do_upload_with_broken_servers, 2))
1156         return d
1157
1158
1159     def test_servers_with_unique_shares(self):
1160         # servers_with_unique_shares expects a dict of 
1161         # shnum => peerid as a preexisting shares argument.
1162         test1 = {
1163                  1 : "server1",
1164                  2 : "server2",
1165                  3 : "server3",
1166                  4 : "server4"
1167                 }
1168         unique_servers = upload.servers_with_unique_shares(test1)
1169         self.failUnlessEqual(4, len(unique_servers))
1170         for server in ["server1", "server2", "server3", "server4"]:
1171             self.failUnlessIn(server, unique_servers)
1172         test1[4] = "server1"
1173         # Now there should only be 3 unique servers.
1174         unique_servers = upload.servers_with_unique_shares(test1)
1175         self.failUnlessEqual(3, len(unique_servers))
1176         for server in ["server1", "server2", "server3"]:
1177             self.failUnlessIn(server, unique_servers)
1178         # servers_with_unique_shares expects to receive some object with
1179         # a peerid attribute. So we make a FakePeerTracker whose only
1180         # job is to have a peerid attribute.
1181         class FakePeerTracker:
1182             pass
1183         trackers = []
1184         for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1185             t = FakePeerTracker()
1186             t.peerid = server
1187             t.buckets = [i]
1188             trackers.append(t)
1189         # Recall that there are 3 unique servers in test1. Since none of
1190         # those overlap with the ones in trackers, we should get 7 back
1191         unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
1192         self.failUnlessEqual(7, len(unique_servers))
1193         expected_servers = ["server" + str(i) for i in xrange(1, 9)]
1194         expected_servers.remove("server4")
1195         for server in expected_servers:
1196             self.failUnlessIn(server, unique_servers)
1197         # Now add an overlapping server to trackers.
1198         t = FakePeerTracker()
1199         t.peerid = "server1"
1200         t.buckets = [1]
1201         trackers.append(t)
1202         unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
1203         self.failUnlessEqual(7, len(unique_servers))
1204         for server in expected_servers:
1205             self.failUnlessIn(server, unique_servers)
1206         test = {}
1207         unique_servers = upload.servers_with_unique_shares(test)
1208         self.failUnlessEqual(0, len(test))
1209
1210
1211     def test_shares_by_server(self):
1212         test = dict([(i, "server%d" % i) for i in xrange(1, 5)])
1213         shares_by_server = upload.shares_by_server(test)
1214         self.failUnlessEqual(set([1]), shares_by_server["server1"])
1215         self.failUnlessEqual(set([2]), shares_by_server["server2"])
1216         self.failUnlessEqual(set([3]), shares_by_server["server3"])
1217         self.failUnlessEqual(set([4]), shares_by_server["server4"])
1218         test1 = {
1219                     1 : "server1",
1220                     2 : "server1",
1221                     3 : "server1",
1222                     4 : "server2",
1223                     5 : "server2"
1224                 }
1225         shares_by_server = upload.shares_by_server(test1)
1226         self.failUnlessEqual(set([1, 2, 3]), shares_by_server["server1"])
1227         self.failUnlessEqual(set([4, 5]), shares_by_server["server2"])
1228
1229
1230     def test_existing_share_detection(self):
1231         self.basedir = self.mktemp()
1232         d = self._setup_and_upload()
1233         # Our final setup should look like this:
1234         # server 1: shares 1 - 10, read-only
1235         # server 2: empty
1236         # server 3: empty
1237         # server 4: empty
1238         # The purpose of this test is to make sure that the peer selector
1239         # knows about the shares on server 1, even though it is read-only.
1240         # It used to simply filter these out, which would cause the test
1241         # to fail when servers_of_happiness = 4.
1242         d.addCallback(lambda ign:
1243             self._add_server_with_share(1, 0, True))
1244         d.addCallback(lambda ign:
1245             self._add_server_with_share(2))
1246         d.addCallback(lambda ign:
1247             self._add_server_with_share(3))
1248         d.addCallback(lambda ign:
1249             self._add_server_with_share(4))
1250         def _copy_shares(ign):
1251             for i in xrange(1, 10):
1252                 self._copy_share_to_server(i, 1)
1253         d.addCallback(_copy_shares)
1254         d.addCallback(lambda ign:
1255             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1256         def _prepare_client(ign):
1257             client = self.g.clients[0]
1258             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1259             return client
1260         d.addCallback(_prepare_client)
1261         d.addCallback(lambda client:
1262             client.upload(upload.Data("data" * 10000, convergence="")))
1263         return d
1264
1265
1266     def test_should_add_server(self):
1267         shares = dict([(i, "server%d" % i) for i in xrange(10)])
1268         self.failIf(upload.should_add_server(shares, "server1", 4))
1269         shares[4] = "server1"
1270         self.failUnless(upload.should_add_server(shares, "server4", 4))
1271         shares = {}
1272         self.failUnless(upload.should_add_server(shares, "server1", 1))
1273
1274
1275     def test_exception_messages_during_peer_selection(self):
1276         # server 1: readonly, no shares
1277         # server 2: readonly, no shares
1278         # server 3: readonly, no shares
1279         # server 4: readonly, no shares
1280         # server 5: readonly, no shares
1281         # This will fail, but we want to make sure that the log messages
1282         # are informative about why it has failed.
1283         self.basedir = self.mktemp()
1284         d = self._setup_and_upload()
1285         d.addCallback(lambda ign:
1286             self._add_server_with_share(server_number=1, readonly=True))
1287         d.addCallback(lambda ign:
1288             self._add_server_with_share(server_number=2, readonly=True))
1289         d.addCallback(lambda ign:
1290             self._add_server_with_share(server_number=3, readonly=True))
1291         d.addCallback(lambda ign:
1292             self._add_server_with_share(server_number=4, readonly=True))
1293         d.addCallback(lambda ign:
1294             self._add_server_with_share(server_number=5, readonly=True))
1295         d.addCallback(lambda ign:
1296             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1297         def _reset_encoding_parameters(ign):
1298             client = self.g.clients[0]
1299             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1300             return client
1301         d.addCallback(_reset_encoding_parameters)
1302         d.addCallback(lambda client:
1303             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1304                             "peer selection failed for <Tahoe2PeerSelector "
1305                             "for upload dglev>: placed 0 shares out of 10 "
1306                             "total (10 homeless), want to place on 4 servers,"
1307                             " sent 5 queries to 5 peers, 0 queries placed "
1308                             "some shares, 5 placed none "
1309                             "(of which 5 placed none due to the server being "
1310                             "full and 0 placed none due to an error)",
1311                             client.upload,
1312                             upload.Data("data" * 10000, convergence="")))
1313
1314
1315         # server 1: readonly, no shares
1316         # server 2: broken, no shares
1317         # server 3: readonly, no shares
1318         # server 4: readonly, no shares
1319         # server 5: readonly, no shares
1320         def _reset(ign):
1321             self.basedir = self.mktemp()
1322         d.addCallback(_reset)
1323         d.addCallback(lambda ign:
1324             self._setup_and_upload())
1325         d.addCallback(lambda ign:
1326             self._add_server_with_share(server_number=1, readonly=True))
1327         d.addCallback(lambda ign:
1328             self._add_server_with_share(server_number=2))
1329         def _break_server_2(ign):
1330             server = self.g.servers_by_number[2].my_nodeid
1331             # We have to break the server in servers_by_id, 
1332             # because the ones in servers_by_number isn't wrapped,
1333             # and doesn't look at its broken attribute
1334             self.g.servers_by_id[server].broken = True
1335         d.addCallback(_break_server_2)
1336         d.addCallback(lambda ign:
1337             self._add_server_with_share(server_number=3, readonly=True))
1338         d.addCallback(lambda ign:
1339             self._add_server_with_share(server_number=4, readonly=True))
1340         d.addCallback(lambda ign:
1341             self._add_server_with_share(server_number=5, readonly=True))
1342         d.addCallback(lambda ign:
1343             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1344         def _reset_encoding_parameters(ign):
1345             client = self.g.clients[0]
1346             client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
1347             return client
1348         d.addCallback(_reset_encoding_parameters)
1349         d.addCallback(lambda client:
1350             self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1351                             "peer selection failed for <Tahoe2PeerSelector "
1352                             "for upload dglev>: placed 0 shares out of 10 "
1353                             "total (10 homeless), want to place on 4 servers,"
1354                             " sent 5 queries to 5 peers, 0 queries placed "
1355                             "some shares, 5 placed none "
1356                             "(of which 4 placed none due to the server being "
1357                             "full and 1 placed none due to an error)",
1358                             client.upload,
1359                             upload.Data("data" * 10000, convergence="")))
1360         return d
1361
1362
1363     def _set_up_nodes_extra_config(self, clientdir):
1364         cfgfn = os.path.join(clientdir, "tahoe.cfg")
1365         oldcfg = open(cfgfn, "r").read()
1366         f = open(cfgfn, "wt")
1367         f.write(oldcfg)
1368         f.write("\n")
1369         f.write("[client]\n")
1370         f.write("shares.needed = 7\n")
1371         f.write("shares.total = 12\n")
1372         f.write("\n")
1373         f.close()
1374         return None
1375
1376 # TODO:
1377 #  upload with exactly 75 peers (shares_of_happiness)
1378 #  have a download fail
1379 #  cancel a download (need to implement more cancel stuff)