]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
upload: make peer-selection a bit more uniform. Closes #132.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1
2 import os
3 from twisted.trial import unittest
4 from twisted.python.failure import Failure
5 from twisted.internet import defer
6 from cStringIO import StringIO
7
8 from allmydata import upload, encode, uri
9 from allmydata.interfaces import IFileURI
10 from allmydata.util.assertutil import precondition
11 from foolscap import eventual
12
13 class Uploadable(unittest.TestCase):
14     def shouldEqual(self, data, expected):
15         self.failUnless(isinstance(data, list))
16         for e in data:
17             self.failUnless(isinstance(e, str))
18         s = "".join(data)
19         self.failUnlessEqual(s, expected)
20
21     def test_filehandle(self):
22         s = StringIO("a"*41)
23         u = upload.FileHandle(s)
24         d = u.get_size()
25         d.addCallback(self.failUnlessEqual, 41)
26         d.addCallback(lambda res: u.read(1))
27         d.addCallback(self.shouldEqual, "a")
28         d.addCallback(lambda res: u.read(80))
29         d.addCallback(self.shouldEqual, "a"*40)
30         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
31         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
32         return d
33
34     def test_filename(self):
35         basedir = "upload/Uploadable/test_filename"
36         os.makedirs(basedir)
37         fn = os.path.join(basedir, "file")
38         f = open(fn, "w")
39         f.write("a"*41)
40         f.close()
41         u = upload.FileName(fn)
42         d = u.get_size()
43         d.addCallback(self.failUnlessEqual, 41)
44         d.addCallback(lambda res: u.read(1))
45         d.addCallback(self.shouldEqual, "a")
46         d.addCallback(lambda res: u.read(80))
47         d.addCallback(self.shouldEqual, "a"*40)
48         d.addCallback(lambda res: u.close())
49         return d
50
51     def test_data(self):
52         s = "a"*41
53         u = upload.Data(s)
54         d = u.get_size()
55         d.addCallback(self.failUnlessEqual, 41)
56         d.addCallback(lambda res: u.read(1))
57         d.addCallback(self.shouldEqual, "a")
58         d.addCallback(lambda res: u.read(80))
59         d.addCallback(self.shouldEqual, "a"*40)
60         d.addCallback(lambda res: u.close())
61         return d
62
63 class FakePeer:
64     def __init__(self, mode="good"):
65         self.ss = FakeStorageServer(mode)
66
67     def callRemote(self, methname, *args, **kwargs):
68         def _call():
69             meth = getattr(self, methname)
70             return meth(*args, **kwargs)
71         return defer.maybeDeferred(_call)
72
73     def get_service(self, sname):
74         assert sname == "storageserver"
75         return self.ss
76
77 class FakeStorageServer:
78     def __init__(self, mode):
79         self.mode = mode
80         self.allocated = []
81         self.queries = 0
82     def callRemote(self, methname, *args, **kwargs):
83         def _call():
84             meth = getattr(self, methname)
85             return meth(*args, **kwargs)
86         d = eventual.fireEventually()
87         d.addCallback(lambda res: _call())
88         return d
89
90     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
91                          sharenums, share_size, canary):
92         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
93         self.queries += 1
94         if self.mode == "full":
95             return (set(), {},)
96         elif self.mode == "already got them":
97             return (set(sharenums), {},)
98         else:
99             for shnum in sharenums:
100                 self.allocated.append( (storage_index, shnum) )
101             return (set(),
102                     dict([( shnum, FakeBucketWriter(share_size) )
103                           for shnum in sharenums]),
104                     )
105
106 class FakeBucketWriter:
107     # a diagnostic version of storageserver.BucketWriter
108     def __init__(self, size):
109         self.data = StringIO()
110         self.closed = False
111         self._size = size
112
113     def callRemote(self, methname, *args, **kwargs):
114         def _call():
115             meth = getattr(self, "remote_" + methname)
116             return meth(*args, **kwargs)
117         d = eventual.fireEventually()
118         d.addCallback(lambda res: _call())
119         return d
120
121     def remote_write(self, offset, data):
122         precondition(not self.closed)
123         precondition(offset >= 0)
124         precondition(offset+len(data) <= self._size,
125                      "offset=%d + data=%d > size=%d" %
126                      (offset, len(data), self._size))
127         self.data.seek(offset)
128         self.data.write(data)
129
130     def remote_close(self):
131         precondition(not self.closed)
132         self.closed = True
133
134 class FakeClient:
135     def __init__(self, mode="good", num_servers=50):
136         self.mode = mode
137         self.num_servers = num_servers
138     def get_permuted_peers(self, storage_index, include_myself):
139         peers = [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),)
140                   for fakeid in range(self.num_servers) ]
141         self.last_peers = [p[2] for p in peers]
142         return peers
143     def get_push_to_ourselves(self):
144         return None
145     def get_encoding_parameters(self):
146         return None
147
148     def get_renewal_secret(self):
149         return ""
150     def get_cancel_secret(self):
151         return ""
152
153 DATA = """
154 Once upon a time, there was a beautiful princess named Buttercup. She lived
155 in a magical land where every file was stored securely among millions of
156 machines, and nobody ever worried about their data being lost ever again.
157 The End.
158 """
159 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
160
161 SIZE_ZERO = 0
162 SIZE_SMALL = 16
163 SIZE_LARGE = len(DATA)
164
165 class GoodServer(unittest.TestCase):
166     def setUp(self):
167         self.node = FakeClient(mode="good")
168         self.u = upload.Uploader()
169         self.u.running = True
170         self.u.parent = self.node
171
172     def _check_small(self, newuri, size):
173         u = IFileURI(newuri)
174         self.failUnless(isinstance(u, uri.LiteralFileURI))
175         self.failUnlessEqual(len(u.data), size)
176
177     def _check_large(self, newuri, size):
178         u = IFileURI(newuri)
179         self.failUnless(isinstance(u, uri.CHKFileURI))
180         self.failUnless(isinstance(u.storage_index, str))
181         self.failUnlessEqual(len(u.storage_index), 16)
182         self.failUnless(isinstance(u.key, str))
183         self.failUnlessEqual(len(u.key), 16)
184         self.failUnlessEqual(u.size, size)
185
186     def get_data(self, size):
187         return DATA[:size]
188
189     def test_data_zero(self):
190         data = self.get_data(SIZE_ZERO)
191         d = self.u.upload_data(data)
192         d.addCallback(self._check_small, SIZE_ZERO)
193         return d
194
195     def test_data_small(self):
196         data = self.get_data(SIZE_SMALL)
197         d = self.u.upload_data(data)
198         d.addCallback(self._check_small, SIZE_SMALL)
199         return d
200
201     def test_data_large(self):
202         data = self.get_data(SIZE_LARGE)
203         d = self.u.upload_data(data)
204         d.addCallback(self._check_large, SIZE_LARGE)
205         return d
206
207     def test_data_large_odd_segments(self):
208         data = self.get_data(SIZE_LARGE)
209         segsize = int(SIZE_LARGE / 2.5)
210         # we want 3 segments, since that's not a power of two
211         d = self.u.upload_data(data, {"max_segment_size": segsize})
212         d.addCallback(self._check_large, SIZE_LARGE)
213         return d
214
215     def test_filehandle_zero(self):
216         data = self.get_data(SIZE_ZERO)
217         d = self.u.upload_filehandle(StringIO(data))
218         d.addCallback(self._check_small, SIZE_ZERO)
219         return d
220
221     def test_filehandle_small(self):
222         data = self.get_data(SIZE_SMALL)
223         d = self.u.upload_filehandle(StringIO(data))
224         d.addCallback(self._check_small, SIZE_SMALL)
225         return d
226
227     def test_filehandle_large(self):
228         data = self.get_data(SIZE_LARGE)
229         d = self.u.upload_filehandle(StringIO(data))
230         d.addCallback(self._check_large, SIZE_LARGE)
231         return d
232
233     def test_filename_zero(self):
234         fn = "Uploader-test_filename_zero.data"
235         f = open(fn, "wb")
236         data = self.get_data(SIZE_ZERO)
237         f.write(data)
238         f.close()
239         d = self.u.upload_filename(fn)
240         d.addCallback(self._check_small, SIZE_ZERO)
241         return d
242
243     def test_filename_small(self):
244         fn = "Uploader-test_filename_small.data"
245         f = open(fn, "wb")
246         data = self.get_data(SIZE_SMALL)
247         f.write(data)
248         f.close()
249         d = self.u.upload_filename(fn)
250         d.addCallback(self._check_small, SIZE_SMALL)
251         return d
252
253     def test_filename_large(self):
254         fn = "Uploader-test_filename_large.data"
255         f = open(fn, "wb")
256         data = self.get_data(SIZE_LARGE)
257         f.write(data)
258         f.close()
259         d = self.u.upload_filename(fn)
260         d.addCallback(self._check_large, SIZE_LARGE)
261         return d
262
263 class FullServer(unittest.TestCase):
264     def setUp(self):
265         self.node = FakeClient(mode="full")
266         self.u = upload.Uploader()
267         self.u.running = True
268         self.u.parent = self.node
269
270     def _should_fail(self, f):
271         self.failUnless(isinstance(f, Failure) and f.check(encode.NotEnoughPeersError))
272
273     def test_data_large(self):
274         data = DATA
275         d = self.u.upload_data(data)
276         d.addBoth(self._should_fail)
277         return d
278
279 class PeerSelection(unittest.TestCase):
280
281     def make_client(self, num_servers=50):
282         self.node = FakeClient(mode="good", num_servers=num_servers)
283         self.u = upload.Uploader()
284         self.u.running = True
285         self.u.parent = self.node
286
287     def get_data(self, size):
288         return DATA[:size]
289
290     def _check_large(self, newuri, size):
291         u = IFileURI(newuri)
292         self.failUnless(isinstance(u, uri.CHKFileURI))
293         self.failUnless(isinstance(u.storage_index, str))
294         self.failUnlessEqual(len(u.storage_index), 16)
295         self.failUnless(isinstance(u.key, str))
296         self.failUnlessEqual(len(u.key), 16)
297         self.failUnlessEqual(u.size, size)
298
299     def test_one_each(self):
300         # if we have 50 shares, and there are 50 peers, and they all accept a
301         # share, we should get exactly one share per peer
302
303         self.make_client()
304         data = self.get_data(SIZE_LARGE)
305         self.u.DEFAULT_ENCODING_PARAMETERS = (25, 30, 50)
306         d = self.u.upload_data(data)
307         d.addCallback(self._check_large, SIZE_LARGE)
308         def _check(res):
309             for p in self.node.last_peers:
310                 allocated = p.ss.allocated
311                 self.failUnlessEqual(len(allocated), 1)
312                 self.failUnlessEqual(p.ss.queries, 1)
313         d.addCallback(_check)
314         return d
315
316     def test_two_each(self):
317         # if we have 100 shares, and there are 50 peers, and they all accept
318         # all shares, we should get exactly two shares per peer
319
320         self.make_client()
321         data = self.get_data(SIZE_LARGE)
322         self.u.DEFAULT_ENCODING_PARAMETERS = (50, 75, 100)
323         d = self.u.upload_data(data)
324         d.addCallback(self._check_large, SIZE_LARGE)
325         def _check(res):
326             for p in self.node.last_peers:
327                 allocated = p.ss.allocated
328                 self.failUnlessEqual(len(allocated), 2)
329                 self.failUnlessEqual(p.ss.queries, 2)
330         d.addCallback(_check)
331         return d
332
333     def test_one_each_plus_one_extra(self):
334         # if we have 51 shares, and there are 50 peers, then one peer gets
335         # two shares and the rest get just one
336
337         self.make_client()
338         data = self.get_data(SIZE_LARGE)
339         self.u.DEFAULT_ENCODING_PARAMETERS = (24, 41, 51)
340         d = self.u.upload_data(data)
341         d.addCallback(self._check_large, SIZE_LARGE)
342         def _check(res):
343             got_one = []
344             got_two = []
345             for p in self.node.last_peers:
346                 allocated = p.ss.allocated
347                 self.failUnless(len(allocated) in (1,2), len(allocated))
348                 if len(allocated) == 1:
349                     self.failUnlessEqual(p.ss.queries, 1)
350                     got_one.append(p)
351                 else:
352                     self.failUnlessEqual(p.ss.queries, 2)
353                     got_two.append(p)
354             self.failUnlessEqual(len(got_one), 49)
355             self.failUnlessEqual(len(got_two), 1)
356         d.addCallback(_check)
357         return d
358
359     def test_four_each(self):
360         # if we have 200 shares, and there are 50 peers, then each peer gets
361         # 4 shares. The design goal is to accomplish this with only two
362         # queries per peer.
363
364         self.make_client()
365         data = self.get_data(SIZE_LARGE)
366         self.u.DEFAULT_ENCODING_PARAMETERS = (100, 150, 200)
367         d = self.u.upload_data(data)
368         d.addCallback(self._check_large, SIZE_LARGE)
369         def _check(res):
370             for p in self.node.last_peers:
371                 allocated = p.ss.allocated
372                 self.failUnlessEqual(len(allocated), 4)
373                 self.failUnlessEqual(p.ss.queries, 2)
374         d.addCallback(_check)
375         return d
376
377     def test_three_of_ten(self):
378         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
379         # 4+4+2
380
381         self.make_client(3)
382         data = self.get_data(SIZE_LARGE)
383         self.u.DEFAULT_ENCODING_PARAMETERS = (3, 5, 10)
384         d = self.u.upload_data(data)
385         d.addCallback(self._check_large, SIZE_LARGE)
386         def _check(res):
387             counts = {}
388             for p in self.node.last_peers:
389                 allocated = p.ss.allocated
390                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
391             histogram = [counts.get(i, 0) for i in range(5)]
392             self.failUnlessEqual(histogram, [0,0,0,2,1])
393         d.addCallback(_check)
394         return d
395
396
397 # TODO:
398 #  upload with exactly 75 peers (shares_of_happiness)
399 #  have a download fail
400 #  cancel a download (need to implement more cancel stuff)