]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
upload: return an UploadResults instance (with .uri) instead of just a URI
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1
2 import os
3 from twisted.trial import unittest
4 from twisted.python.failure import Failure
5 from twisted.python import log
6 from cStringIO import StringIO
7
8 from allmydata import upload, encode, uri
9 from allmydata.interfaces import IFileURI
10 from allmydata.util.assertutil import precondition
11 from foolscap import eventual
12
13 MiB = 1024*1024
14
15 def extract_uri(results):
16     return results.uri
17
18 class Uploadable(unittest.TestCase):
19     def shouldEqual(self, data, expected):
20         self.failUnless(isinstance(data, list))
21         for e in data:
22             self.failUnless(isinstance(e, str))
23         s = "".join(data)
24         self.failUnlessEqual(s, expected)
25
26     def test_filehandle_random_key(self):
27         return self._test_filehandle(True)
28
29     def test_filehandle_content_hash_key(self):
30         return self._test_filehandle(False)
31
32     def _test_filehandle(self, randomkey):
33         s = StringIO("a"*41)
34         u = upload.FileHandle(s, randomkey)
35         d = u.get_size()
36         d.addCallback(self.failUnlessEqual, 41)
37         d.addCallback(lambda res: u.read(1))
38         d.addCallback(self.shouldEqual, "a")
39         d.addCallback(lambda res: u.read(80))
40         d.addCallback(self.shouldEqual, "a"*40)
41         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
42         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
43         return d
44
45     def test_filename(self):
46         basedir = "upload/Uploadable/test_filename"
47         os.makedirs(basedir)
48         fn = os.path.join(basedir, "file")
49         f = open(fn, "w")
50         f.write("a"*41)
51         f.close()
52         u = upload.FileName(fn)
53         d = u.get_size()
54         d.addCallback(self.failUnlessEqual, 41)
55         d.addCallback(lambda res: u.read(1))
56         d.addCallback(self.shouldEqual, "a")
57         d.addCallback(lambda res: u.read(80))
58         d.addCallback(self.shouldEqual, "a"*40)
59         d.addCallback(lambda res: u.close())
60         return d
61
62     def test_data(self):
63         s = "a"*41
64         u = upload.Data(s)
65         d = u.get_size()
66         d.addCallback(self.failUnlessEqual, 41)
67         d.addCallback(lambda res: u.read(1))
68         d.addCallback(self.shouldEqual, "a")
69         d.addCallback(lambda res: u.read(80))
70         d.addCallback(self.shouldEqual, "a"*40)
71         d.addCallback(lambda res: u.close())
72         return d
73
74 class FakeStorageServer:
75     def __init__(self, mode):
76         self.mode = mode
77         self.allocated = []
78         self.queries = 0
79     def callRemote(self, methname, *args, **kwargs):
80         def _call():
81             meth = getattr(self, methname)
82             return meth(*args, **kwargs)
83         d = eventual.fireEventually()
84         d.addCallback(lambda res: _call())
85         return d
86
87     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
88                          sharenums, share_size, canary):
89         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
90         self.queries += 1
91         if self.mode == "full":
92             return (set(), {},)
93         elif self.mode == "already got them":
94             return (set(sharenums), {},)
95         else:
96             for shnum in sharenums:
97                 self.allocated.append( (storage_index, shnum) )
98             return (set(),
99                     dict([( shnum, FakeBucketWriter(share_size) )
100                           for shnum in sharenums]),
101                     )
102
103 class FakeBucketWriter:
104     # a diagnostic version of storageserver.BucketWriter
105     def __init__(self, size):
106         self.data = StringIO()
107         self.closed = False
108         self._size = size
109
110     def callRemote(self, methname, *args, **kwargs):
111         def _call():
112             meth = getattr(self, "remote_" + methname)
113             return meth(*args, **kwargs)
114         d = eventual.fireEventually()
115         d.addCallback(lambda res: _call())
116         return d
117
118     def remote_write(self, offset, data):
119         precondition(not self.closed)
120         precondition(offset >= 0)
121         precondition(offset+len(data) <= self._size,
122                      "offset=%d + data=%d > size=%d" %
123                      (offset, len(data), self._size))
124         self.data.seek(offset)
125         self.data.write(data)
126
127     def remote_close(self):
128         precondition(not self.closed)
129         self.closed = True
130
131     def remote_abort(self):
132         log.err("uh oh, I was asked to abort")
133
134 class FakeClient:
135     DEFAULT_ENCODING_PARAMETERS = {"k":25,
136                                    "happy": 75,
137                                    "n": 100,
138                                    "max_segment_size": 1*MiB,
139                                    }
140     def __init__(self, mode="good", num_servers=50):
141         self.mode = mode
142         self.num_servers = num_servers
143     def log(self, *args, **kwargs):
144         pass
145     def get_permuted_peers(self, storage_index, include_myself):
146         peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
147                   for fakeid in range(self.num_servers) ]
148         self.last_peers = [p[1] for p in peers]
149         return peers
150     def get_encoding_parameters(self):
151         return self.DEFAULT_ENCODING_PARAMETERS
152
153     def get_renewal_secret(self):
154         return ""
155     def get_cancel_secret(self):
156         return ""
157
158 DATA = """
159 Once upon a time, there was a beautiful princess named Buttercup. She lived
160 in a magical land where every file was stored securely among millions of
161 machines, and nobody ever worried about their data being lost ever again.
162 The End.
163 """
164 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
165
166 SIZE_ZERO = 0
167 SIZE_SMALL = 16
168 SIZE_LARGE = len(DATA)
169
170 def upload_data(uploader, data):
171     u = upload.Data(data)
172     return uploader.upload(u)
173 def upload_filename(uploader, filename):
174     u = upload.FileName(filename)
175     return uploader.upload(u)
176 def upload_filehandle(uploader, fh):
177     u = upload.FileHandle(fh)
178     return uploader.upload(u)
179
180 class GoodServer(unittest.TestCase):
181     def setUp(self):
182         self.node = FakeClient(mode="good")
183         self.u = upload.Uploader()
184         self.u.running = True
185         self.u.parent = self.node
186
187     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
188         p = {"k": k,
189              "happy": happy,
190              "n": n,
191              "max_segment_size": max_segsize,
192              }
193         self.node.DEFAULT_ENCODING_PARAMETERS = p
194
195     def _check_small(self, newuri, size):
196         u = IFileURI(newuri)
197         self.failUnless(isinstance(u, uri.LiteralFileURI))
198         self.failUnlessEqual(len(u.data), size)
199
200     def _check_large(self, newuri, size):
201         u = IFileURI(newuri)
202         self.failUnless(isinstance(u, uri.CHKFileURI))
203         self.failUnless(isinstance(u.storage_index, str))
204         self.failUnlessEqual(len(u.storage_index), 16)
205         self.failUnless(isinstance(u.key, str))
206         self.failUnlessEqual(len(u.key), 16)
207         self.failUnlessEqual(u.size, size)
208
209     def get_data(self, size):
210         return DATA[:size]
211
212     def test_data_zero(self):
213         data = self.get_data(SIZE_ZERO)
214         d = upload_data(self.u, data)
215         d.addCallback(extract_uri)
216         d.addCallback(self._check_small, SIZE_ZERO)
217         return d
218
219     def test_data_small(self):
220         data = self.get_data(SIZE_SMALL)
221         d = upload_data(self.u, data)
222         d.addCallback(extract_uri)
223         d.addCallback(self._check_small, SIZE_SMALL)
224         return d
225
226     def test_data_large(self):
227         data = self.get_data(SIZE_LARGE)
228         d = upload_data(self.u, data)
229         d.addCallback(extract_uri)
230         d.addCallback(self._check_large, SIZE_LARGE)
231         return d
232
233     def test_data_large_odd_segments(self):
234         data = self.get_data(SIZE_LARGE)
235         segsize = int(SIZE_LARGE / 2.5)
236         # we want 3 segments, since that's not a power of two
237         self.set_encoding_parameters(25, 75, 100, segsize)
238         d = upload_data(self.u, data)
239         d.addCallback(extract_uri)
240         d.addCallback(self._check_large, SIZE_LARGE)
241         return d
242
243     def test_filehandle_zero(self):
244         data = self.get_data(SIZE_ZERO)
245         d = upload_filehandle(self.u, StringIO(data))
246         d.addCallback(extract_uri)
247         d.addCallback(self._check_small, SIZE_ZERO)
248         return d
249
250     def test_filehandle_small(self):
251         data = self.get_data(SIZE_SMALL)
252         d = upload_filehandle(self.u, StringIO(data))
253         d.addCallback(extract_uri)
254         d.addCallback(self._check_small, SIZE_SMALL)
255         return d
256
257     def test_filehandle_large(self):
258         data = self.get_data(SIZE_LARGE)
259         d = upload_filehandle(self.u, StringIO(data))
260         d.addCallback(extract_uri)
261         d.addCallback(self._check_large, SIZE_LARGE)
262         return d
263
264     def test_filename_zero(self):
265         fn = "Uploader-test_filename_zero.data"
266         f = open(fn, "wb")
267         data = self.get_data(SIZE_ZERO)
268         f.write(data)
269         f.close()
270         d = upload_filename(self.u, fn)
271         d.addCallback(extract_uri)
272         d.addCallback(self._check_small, SIZE_ZERO)
273         return d
274
275     def test_filename_small(self):
276         fn = "Uploader-test_filename_small.data"
277         f = open(fn, "wb")
278         data = self.get_data(SIZE_SMALL)
279         f.write(data)
280         f.close()
281         d = upload_filename(self.u, fn)
282         d.addCallback(extract_uri)
283         d.addCallback(self._check_small, SIZE_SMALL)
284         return d
285
286     def test_filename_large(self):
287         fn = "Uploader-test_filename_large.data"
288         f = open(fn, "wb")
289         data = self.get_data(SIZE_LARGE)
290         f.write(data)
291         f.close()
292         d = upload_filename(self.u, fn)
293         d.addCallback(extract_uri)
294         d.addCallback(self._check_large, SIZE_LARGE)
295         return d
296
297 class FullServer(unittest.TestCase):
298     def setUp(self):
299         self.node = FakeClient(mode="full")
300         self.u = upload.Uploader()
301         self.u.running = True
302         self.u.parent = self.node
303
304     def _should_fail(self, f):
305         self.failUnless(isinstance(f, Failure) and f.check(encode.NotEnoughPeersError), f)
306
307     def test_data_large(self):
308         data = DATA
309         d = upload_data(self.u, data)
310         d.addBoth(self._should_fail)
311         return d
312
313 class PeerSelection(unittest.TestCase):
314
315     def make_client(self, num_servers=50):
316         self.node = FakeClient(mode="good", num_servers=num_servers)
317         self.u = upload.Uploader()
318         self.u.running = True
319         self.u.parent = self.node
320
321     def get_data(self, size):
322         return DATA[:size]
323
324     def _check_large(self, newuri, size):
325         u = IFileURI(newuri)
326         self.failUnless(isinstance(u, uri.CHKFileURI))
327         self.failUnless(isinstance(u.storage_index, str))
328         self.failUnlessEqual(len(u.storage_index), 16)
329         self.failUnless(isinstance(u.key, str))
330         self.failUnlessEqual(len(u.key), 16)
331         self.failUnlessEqual(u.size, size)
332
333     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
334         p = {"k": k,
335              "happy": happy,
336              "n": n,
337              "max_segment_size": max_segsize,
338              }
339         self.node.DEFAULT_ENCODING_PARAMETERS = p
340
341     def test_one_each(self):
342         # if we have 50 shares, and there are 50 peers, and they all accept a
343         # share, we should get exactly one share per peer
344
345         self.make_client()
346         data = self.get_data(SIZE_LARGE)
347         self.set_encoding_parameters(25, 30, 50)
348         d = upload_data(self.u, data)
349         d.addCallback(extract_uri)
350         d.addCallback(self._check_large, SIZE_LARGE)
351         def _check(res):
352             for p in self.node.last_peers:
353                 allocated = p.allocated
354                 self.failUnlessEqual(len(allocated), 1)
355                 self.failUnlessEqual(p.queries, 1)
356         d.addCallback(_check)
357         return d
358
359     def test_two_each(self):
360         # if we have 100 shares, and there are 50 peers, and they all accept
361         # all shares, we should get exactly two shares per peer
362
363         self.make_client()
364         data = self.get_data(SIZE_LARGE)
365         self.set_encoding_parameters(50, 75, 100)
366         d = upload_data(self.u, data)
367         d.addCallback(extract_uri)
368         d.addCallback(self._check_large, SIZE_LARGE)
369         def _check(res):
370             for p in self.node.last_peers:
371                 allocated = p.allocated
372                 self.failUnlessEqual(len(allocated), 2)
373                 self.failUnlessEqual(p.queries, 2)
374         d.addCallback(_check)
375         return d
376
377     def test_one_each_plus_one_extra(self):
378         # if we have 51 shares, and there are 50 peers, then one peer gets
379         # two shares and the rest get just one
380
381         self.make_client()
382         data = self.get_data(SIZE_LARGE)
383         self.set_encoding_parameters(24, 41, 51)
384         d = upload_data(self.u, data)
385         d.addCallback(extract_uri)
386         d.addCallback(self._check_large, SIZE_LARGE)
387         def _check(res):
388             got_one = []
389             got_two = []
390             for p in self.node.last_peers:
391                 allocated = p.allocated
392                 self.failUnless(len(allocated) in (1,2), len(allocated))
393                 if len(allocated) == 1:
394                     self.failUnlessEqual(p.queries, 1)
395                     got_one.append(p)
396                 else:
397                     self.failUnlessEqual(p.queries, 2)
398                     got_two.append(p)
399             self.failUnlessEqual(len(got_one), 49)
400             self.failUnlessEqual(len(got_two), 1)
401         d.addCallback(_check)
402         return d
403
404     def test_four_each(self):
405         # if we have 200 shares, and there are 50 peers, then each peer gets
406         # 4 shares. The design goal is to accomplish this with only two
407         # queries per peer.
408
409         self.make_client()
410         data = self.get_data(SIZE_LARGE)
411         self.set_encoding_parameters(100, 150, 200)
412         d = upload_data(self.u, data)
413         d.addCallback(extract_uri)
414         d.addCallback(self._check_large, SIZE_LARGE)
415         def _check(res):
416             for p in self.node.last_peers:
417                 allocated = p.allocated
418                 self.failUnlessEqual(len(allocated), 4)
419                 self.failUnlessEqual(p.queries, 2)
420         d.addCallback(_check)
421         return d
422
423     def test_three_of_ten(self):
424         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
425         # 4+4+2
426
427         self.make_client(3)
428         data = self.get_data(SIZE_LARGE)
429         self.set_encoding_parameters(3, 5, 10)
430         d = upload_data(self.u, data)
431         d.addCallback(extract_uri)
432         d.addCallback(self._check_large, SIZE_LARGE)
433         def _check(res):
434             counts = {}
435             for p in self.node.last_peers:
436                 allocated = p.allocated
437                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
438             histogram = [counts.get(i, 0) for i in range(5)]
439             self.failUnlessEqual(histogram, [0,0,0,2,1])
440         d.addCallback(_check)
441         return d
442
443
444 # TODO:
445 #  upload with exactly 75 peers (shares_of_happiness)
446 #  have a download fail
447 #  cancel a download (need to implement more cancel stuff)