]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
more hierarchical logging: download/upload/encode
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1
2 import os
3 from twisted.trial import unittest
4 from twisted.python.failure import Failure
5 from twisted.internet import defer
6 from cStringIO import StringIO
7
8 from allmydata import upload, encode, uri
9 from allmydata.interfaces import IFileURI
10 from allmydata.util.assertutil import precondition
11 from foolscap import eventual
12
13 class Uploadable(unittest.TestCase):
14     def shouldEqual(self, data, expected):
15         self.failUnless(isinstance(data, list))
16         for e in data:
17             self.failUnless(isinstance(e, str))
18         s = "".join(data)
19         self.failUnlessEqual(s, expected)
20
21     def test_filehandle(self):
22         s = StringIO("a"*41)
23         u = upload.FileHandle(s)
24         d = u.get_size()
25         d.addCallback(self.failUnlessEqual, 41)
26         d.addCallback(lambda res: u.read(1))
27         d.addCallback(self.shouldEqual, "a")
28         d.addCallback(lambda res: u.read(80))
29         d.addCallback(self.shouldEqual, "a"*40)
30         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
31         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
32         return d
33
34     def test_filename(self):
35         basedir = "upload/Uploadable/test_filename"
36         os.makedirs(basedir)
37         fn = os.path.join(basedir, "file")
38         f = open(fn, "w")
39         f.write("a"*41)
40         f.close()
41         u = upload.FileName(fn)
42         d = u.get_size()
43         d.addCallback(self.failUnlessEqual, 41)
44         d.addCallback(lambda res: u.read(1))
45         d.addCallback(self.shouldEqual, "a")
46         d.addCallback(lambda res: u.read(80))
47         d.addCallback(self.shouldEqual, "a"*40)
48         d.addCallback(lambda res: u.close())
49         return d
50
51     def test_data(self):
52         s = "a"*41
53         u = upload.Data(s)
54         d = u.get_size()
55         d.addCallback(self.failUnlessEqual, 41)
56         d.addCallback(lambda res: u.read(1))
57         d.addCallback(self.shouldEqual, "a")
58         d.addCallback(lambda res: u.read(80))
59         d.addCallback(self.shouldEqual, "a"*40)
60         d.addCallback(lambda res: u.close())
61         return d
62
63 class FakePeer:
64     def __init__(self, mode="good"):
65         self.ss = FakeStorageServer(mode)
66
67     def callRemote(self, methname, *args, **kwargs):
68         def _call():
69             meth = getattr(self, methname)
70             return meth(*args, **kwargs)
71         return defer.maybeDeferred(_call)
72
73     def get_service(self, sname):
74         assert sname == "storageserver"
75         return self.ss
76
77 class FakeStorageServer:
78     def __init__(self, mode):
79         self.mode = mode
80         self.allocated = []
81         self.queries = 0
82     def callRemote(self, methname, *args, **kwargs):
83         def _call():
84             meth = getattr(self, methname)
85             return meth(*args, **kwargs)
86         d = eventual.fireEventually()
87         d.addCallback(lambda res: _call())
88         return d
89
90     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
91                          sharenums, share_size, canary):
92         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
93         self.queries += 1
94         if self.mode == "full":
95             return (set(), {},)
96         elif self.mode == "already got them":
97             return (set(sharenums), {},)
98         else:
99             for shnum in sharenums:
100                 self.allocated.append( (storage_index, shnum) )
101             return (set(),
102                     dict([( shnum, FakeBucketWriter(share_size) )
103                           for shnum in sharenums]),
104                     )
105
106 class FakeBucketWriter:
107     # a diagnostic version of storageserver.BucketWriter
108     def __init__(self, size):
109         self.data = StringIO()
110         self.closed = False
111         self._size = size
112
113     def callRemote(self, methname, *args, **kwargs):
114         def _call():
115             meth = getattr(self, "remote_" + methname)
116             return meth(*args, **kwargs)
117         d = eventual.fireEventually()
118         d.addCallback(lambda res: _call())
119         return d
120
121     def remote_write(self, offset, data):
122         precondition(not self.closed)
123         precondition(offset >= 0)
124         precondition(offset+len(data) <= self._size,
125                      "offset=%d + data=%d > size=%d" %
126                      (offset, len(data), self._size))
127         self.data.seek(offset)
128         self.data.write(data)
129
130     def remote_close(self):
131         precondition(not self.closed)
132         self.closed = True
133
134 class FakeIntroducerClient:
135     def when_enough_peers(self, numpeers):
136         return defer.succeed(None)
137
138 class FakeClient:
139     def __init__(self, mode="good", num_servers=50):
140         self.mode = mode
141         self.num_servers = num_servers
142     def get_permuted_peers(self, storage_index, include_myself):
143         peers = [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),)
144                   for fakeid in range(self.num_servers) ]
145         self.last_peers = [p[2] for p in peers]
146         return peers
147     def get_push_to_ourselves(self):
148         return None
149     def get_encoding_parameters(self):
150         return None
151
152     def get_renewal_secret(self):
153         return ""
154     def get_cancel_secret(self):
155         return ""
156
157 DATA = """
158 Once upon a time, there was a beautiful princess named Buttercup. She lived
159 in a magical land where every file was stored securely among millions of
160 machines, and nobody ever worried about their data being lost ever again.
161 The End.
162 """
163 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
164
165 SIZE_ZERO = 0
166 SIZE_SMALL = 16
167 SIZE_LARGE = len(DATA)
168
169 class GoodServer(unittest.TestCase):
170     def setUp(self):
171         self.node = FakeClient(mode="good")
172         self.u = upload.Uploader()
173         self.u.running = True
174         self.u.parent = self.node
175
176     def _check_small(self, newuri, size):
177         u = IFileURI(newuri)
178         self.failUnless(isinstance(u, uri.LiteralFileURI))
179         self.failUnlessEqual(len(u.data), size)
180
181     def _check_large(self, newuri, size):
182         u = IFileURI(newuri)
183         self.failUnless(isinstance(u, uri.CHKFileURI))
184         self.failUnless(isinstance(u.storage_index, str))
185         self.failUnlessEqual(len(u.storage_index), 16)
186         self.failUnless(isinstance(u.key, str))
187         self.failUnlessEqual(len(u.key), 16)
188         self.failUnlessEqual(u.size, size)
189
190     def get_data(self, size):
191         return DATA[:size]
192
193     def test_data_zero(self):
194         data = self.get_data(SIZE_ZERO)
195         d = self.u.upload_data(data)
196         d.addCallback(self._check_small, SIZE_ZERO)
197         return d
198
199     def test_data_small(self):
200         data = self.get_data(SIZE_SMALL)
201         d = self.u.upload_data(data)
202         d.addCallback(self._check_small, SIZE_SMALL)
203         return d
204
205     def test_data_large(self):
206         data = self.get_data(SIZE_LARGE)
207         d = self.u.upload_data(data)
208         d.addCallback(self._check_large, SIZE_LARGE)
209         return d
210
211     def test_data_large_odd_segments(self):
212         data = self.get_data(SIZE_LARGE)
213         segsize = int(SIZE_LARGE / 2.5)
214         # we want 3 segments, since that's not a power of two
215         d = self.u.upload_data(data, {"max_segment_size": segsize})
216         d.addCallback(self._check_large, SIZE_LARGE)
217         return d
218
219     def test_filehandle_zero(self):
220         data = self.get_data(SIZE_ZERO)
221         d = self.u.upload_filehandle(StringIO(data))
222         d.addCallback(self._check_small, SIZE_ZERO)
223         return d
224
225     def test_filehandle_small(self):
226         data = self.get_data(SIZE_SMALL)
227         d = self.u.upload_filehandle(StringIO(data))
228         d.addCallback(self._check_small, SIZE_SMALL)
229         return d
230
231     def test_filehandle_large(self):
232         data = self.get_data(SIZE_LARGE)
233         d = self.u.upload_filehandle(StringIO(data))
234         d.addCallback(self._check_large, SIZE_LARGE)
235         return d
236
237     def test_filename_zero(self):
238         fn = "Uploader-test_filename_zero.data"
239         f = open(fn, "wb")
240         data = self.get_data(SIZE_ZERO)
241         f.write(data)
242         f.close()
243         d = self.u.upload_filename(fn)
244         d.addCallback(self._check_small, SIZE_ZERO)
245         return d
246
247     def test_filename_small(self):
248         fn = "Uploader-test_filename_small.data"
249         f = open(fn, "wb")
250         data = self.get_data(SIZE_SMALL)
251         f.write(data)
252         f.close()
253         d = self.u.upload_filename(fn)
254         d.addCallback(self._check_small, SIZE_SMALL)
255         return d
256
257     def test_filename_large(self):
258         fn = "Uploader-test_filename_large.data"
259         f = open(fn, "wb")
260         data = self.get_data(SIZE_LARGE)
261         f.write(data)
262         f.close()
263         d = self.u.upload_filename(fn)
264         d.addCallback(self._check_large, SIZE_LARGE)
265         return d
266
267 class FullServer(unittest.TestCase):
268     def setUp(self):
269         self.node = FakeClient(mode="full")
270         self.u = upload.Uploader()
271         self.u.running = True
272         self.u.parent = self.node
273
274     def _should_fail(self, f):
275         self.failUnless(isinstance(f, Failure) and f.check(encode.NotEnoughPeersError), f)
276
277     def test_data_large(self):
278         data = DATA
279         d = self.u.upload_data(data)
280         d.addBoth(self._should_fail)
281         return d
282
283 class PeerSelection(unittest.TestCase):
284
285     def make_client(self, num_servers=50):
286         self.node = FakeClient(mode="good", num_servers=num_servers)
287         self.u = upload.Uploader()
288         self.u.running = True
289         self.u.parent = self.node
290
291     def get_data(self, size):
292         return DATA[:size]
293
294     def _check_large(self, newuri, size):
295         u = IFileURI(newuri)
296         self.failUnless(isinstance(u, uri.CHKFileURI))
297         self.failUnless(isinstance(u.storage_index, str))
298         self.failUnlessEqual(len(u.storage_index), 16)
299         self.failUnless(isinstance(u.key, str))
300         self.failUnlessEqual(len(u.key), 16)
301         self.failUnlessEqual(u.size, size)
302
303     def test_one_each(self):
304         # if we have 50 shares, and there are 50 peers, and they all accept a
305         # share, we should get exactly one share per peer
306
307         self.make_client()
308         data = self.get_data(SIZE_LARGE)
309         self.u.DEFAULT_ENCODING_PARAMETERS = (25, 30, 50)
310         d = self.u.upload_data(data)
311         d.addCallback(self._check_large, SIZE_LARGE)
312         def _check(res):
313             for p in self.node.last_peers:
314                 allocated = p.ss.allocated
315                 self.failUnlessEqual(len(allocated), 1)
316                 self.failUnlessEqual(p.ss.queries, 1)
317         d.addCallback(_check)
318         return d
319
320     def test_two_each(self):
321         # if we have 100 shares, and there are 50 peers, and they all accept
322         # all shares, we should get exactly two shares per peer
323
324         self.make_client()
325         data = self.get_data(SIZE_LARGE)
326         self.u.DEFAULT_ENCODING_PARAMETERS = (50, 75, 100)
327         d = self.u.upload_data(data)
328         d.addCallback(self._check_large, SIZE_LARGE)
329         def _check(res):
330             for p in self.node.last_peers:
331                 allocated = p.ss.allocated
332                 self.failUnlessEqual(len(allocated), 2)
333                 self.failUnlessEqual(p.ss.queries, 2)
334         d.addCallback(_check)
335         return d
336
337     def test_one_each_plus_one_extra(self):
338         # if we have 51 shares, and there are 50 peers, then one peer gets
339         # two shares and the rest get just one
340
341         self.make_client()
342         data = self.get_data(SIZE_LARGE)
343         self.u.DEFAULT_ENCODING_PARAMETERS = (24, 41, 51)
344         d = self.u.upload_data(data)
345         d.addCallback(self._check_large, SIZE_LARGE)
346         def _check(res):
347             got_one = []
348             got_two = []
349             for p in self.node.last_peers:
350                 allocated = p.ss.allocated
351                 self.failUnless(len(allocated) in (1,2), len(allocated))
352                 if len(allocated) == 1:
353                     self.failUnlessEqual(p.ss.queries, 1)
354                     got_one.append(p)
355                 else:
356                     self.failUnlessEqual(p.ss.queries, 2)
357                     got_two.append(p)
358             self.failUnlessEqual(len(got_one), 49)
359             self.failUnlessEqual(len(got_two), 1)
360         d.addCallback(_check)
361         return d
362
363     def test_four_each(self):
364         # if we have 200 shares, and there are 50 peers, then each peer gets
365         # 4 shares. The design goal is to accomplish this with only two
366         # queries per peer.
367
368         self.make_client()
369         data = self.get_data(SIZE_LARGE)
370         self.u.DEFAULT_ENCODING_PARAMETERS = (100, 150, 200)
371         d = self.u.upload_data(data)
372         d.addCallback(self._check_large, SIZE_LARGE)
373         def _check(res):
374             for p in self.node.last_peers:
375                 allocated = p.ss.allocated
376                 self.failUnlessEqual(len(allocated), 4)
377                 self.failUnlessEqual(p.ss.queries, 2)
378         d.addCallback(_check)
379         return d
380
381     def test_three_of_ten(self):
382         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
383         # 4+4+2
384
385         self.make_client(3)
386         data = self.get_data(SIZE_LARGE)
387         self.u.DEFAULT_ENCODING_PARAMETERS = (3, 5, 10)
388         d = self.u.upload_data(data)
389         d.addCallback(self._check_large, SIZE_LARGE)
390         def _check(res):
391             counts = {}
392             for p in self.node.last_peers:
393                 allocated = p.ss.allocated
394                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
395             histogram = [counts.get(i, 0) for i in range(5)]
396             self.failUnlessEqual(histogram, [0,0,0,2,1])
397         d.addCallback(_check)
398         return d
399
400
401 # TODO:
402 #  upload with exactly 75 peers (shares_of_happiness)
403 #  have a download fail
404 #  cancel a download (need to implement more cancel stuff)