]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
deletion phase3: add a sqlite database to track renew/cancel-lease secrets, implement...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 from twisted.trial import unittest
3
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable
7 import os.path
8 import itertools
9 from allmydata import interfaces
10 from allmydata.util import fileutil, hashutil
11 from allmydata.storage import BucketWriter, BucketReader, \
12      WriteBucketProxy, ReadBucketProxy, StorageServer
13
14 class Bucket(unittest.TestCase):
15     def make_workdir(self, name):
16         basedir = os.path.join("storage", "Bucket", name)
17         incoming = os.path.join(basedir, "tmp", "bucket")
18         final = os.path.join(basedir, "bucket")
19         fileutil.make_dirs(basedir)
20         fileutil.make_dirs(os.path.join(basedir, "tmp"))
21         return incoming, final
22
23     def bucket_writer_closed(self, bw, consumed):
24         pass
25
26     def test_create(self):
27         incoming, final = self.make_workdir("test_create")
28         bw = BucketWriter(self, incoming, final, 200)
29         bw.remote_write(0, "a"*25)
30         bw.remote_write(25, "b"*25)
31         bw.remote_write(50, "c"*25)
32         bw.remote_write(75, "d"*7)
33         bw.remote_close()
34
35     def test_readwrite(self):
36         incoming, final = self.make_workdir("test_readwrite")
37         bw = BucketWriter(self, incoming, final, 200)
38         bw.remote_write(0, "a"*25)
39         bw.remote_write(25, "b"*25)
40         bw.remote_write(50, "c"*7) # last block may be short
41         bw.remote_close()
42
43         # now read from it
44         br = BucketReader(final)
45         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
46         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
47         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
48
49 class RemoteBucket:
50
51     def callRemote(self, methname, *args, **kwargs):
52         def _call():
53             meth = getattr(self.target, "remote_" + methname)
54             return meth(*args, **kwargs)
55         return defer.maybeDeferred(_call)
56
57 class BucketProxy(unittest.TestCase):
58     def make_bucket(self, name, size):
59         basedir = os.path.join("storage", "BucketProxy", name)
60         incoming = os.path.join(basedir, "tmp", "bucket")
61         final = os.path.join(basedir, "bucket")
62         fileutil.make_dirs(basedir)
63         fileutil.make_dirs(os.path.join(basedir, "tmp"))
64         bw = BucketWriter(self, incoming, final, size)
65         rb = RemoteBucket()
66         rb.target = bw
67         return bw, rb, final
68
69     def bucket_writer_closed(self, bw, consumed):
70         pass
71
72     def test_create(self):
73         bw, rb, final = self.make_bucket("test_create", 500)
74         bp = WriteBucketProxy(rb,
75                               data_size=300,
76                               segment_size=10,
77                               num_segments=5,
78                               num_share_hashes=3,
79                               uri_extension_size=500)
80         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
81
82     def test_readwrite(self):
83         # Let's pretend each share has 100 bytes of data, and that there are
84         # 4 segments (25 bytes each), and 8 shares total. So the three
85         # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
86         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
87         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
88         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
89         # long. That should make the whole share:
90         #
91         # 0x1c + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1406 bytes long
92
93         plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
94                             for i in range(7)]
95         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
96                             for i in range(7)]
97         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
98                         for i in range(7)]
99         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
100                         for i in (1,9,13)]
101         uri_extension = "s" + "E"*498 + "e"
102
103         bw, rb, final = self.make_bucket("test_readwrite", 1406)
104         bp = WriteBucketProxy(rb,
105                               data_size=95,
106                               segment_size=25,
107                               num_segments=4,
108                               num_share_hashes=3,
109                               uri_extension_size=len(uri_extension))
110
111         d = bp.start()
112         d.addCallback(lambda res: bp.put_block(0, "a"*25))
113         d.addCallback(lambda res: bp.put_block(1, "b"*25))
114         d.addCallback(lambda res: bp.put_block(2, "c"*25))
115         d.addCallback(lambda res: bp.put_block(3, "d"*20))
116         d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
117         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
118         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
119         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
120         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
121         d.addCallback(lambda res: bp.close())
122
123         # now read everything back
124         def _start_reading(res):
125             br = BucketReader(final)
126             rb = RemoteBucket()
127             rb.target = br
128             rbp = ReadBucketProxy(rb)
129             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
130
131             d1 = rbp.startIfNecessary()
132             d1.addCallback(lambda res: rbp.get_block(0))
133             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
134             d1.addCallback(lambda res: rbp.get_block(1))
135             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
136             d1.addCallback(lambda res: rbp.get_block(2))
137             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
138             d1.addCallback(lambda res: rbp.get_block(3))
139             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
140
141             d1.addCallback(lambda res: rbp.get_plaintext_hashes())
142             d1.addCallback(lambda res:
143                            self.failUnlessEqual(res, plaintext_hashes))
144             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
145             d1.addCallback(lambda res:
146                            self.failUnlessEqual(res, crypttext_hashes))
147             d1.addCallback(lambda res: rbp.get_block_hashes())
148             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
149             d1.addCallback(lambda res: rbp.get_share_hashes())
150             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
151             d1.addCallback(lambda res: rbp.get_uri_extension())
152             d1.addCallback(lambda res:
153                            self.failUnlessEqual(res, uri_extension))
154
155             return d1
156
157         d.addCallback(_start_reading)
158
159         return d
160
161
162
163 class Server(unittest.TestCase):
164
165     def setUp(self):
166         self.sparent = service.MultiService()
167         self._secret = itertools.count()
168     def tearDown(self):
169         return self.sparent.stopService()
170
171     def workdir(self, name):
172         basedir = os.path.join("storage", "Server", name)
173         return basedir
174
175     def create(self, name, sizelimit=None):
176         workdir = self.workdir(name)
177         ss = StorageServer(workdir, sizelimit)
178         ss.setServiceParent(self.sparent)
179         return ss
180
181     def test_create(self):
182         ss = self.create("test_create")
183
184     def allocate(self, ss, storage_index, sharenums, size):
185         renew_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
186         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
187         return ss.remote_allocate_buckets(storage_index,
188                                           renew_secret, cancel_secret,
189                                           sharenums, size, Referenceable())
190
191     def test_allocate(self):
192         ss = self.create("test_allocate")
193
194         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
195
196         canary = Referenceable()
197         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
198         self.failUnlessEqual(already, set())
199         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
200
201         # while the buckets are open, they should not count as readable
202         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
203
204         for i,wb in writers.items():
205             wb.remote_write(0, "%25d" % i)
206             wb.remote_close()
207
208         # now they should be readable
209         b = ss.remote_get_buckets("vid")
210         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
211         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
212
213         # now if we about writing again, the server should offer those three
214         # buckets as already present
215         already,writers = self.allocate(ss, "vid", [0,1,2,3,4], 75)
216         self.failUnlessEqual(already, set([0,1,2]))
217         self.failUnlessEqual(set(writers.keys()), set([3,4]))
218
219         # while those two buckets are open for writing, the server should
220         # tell new uploaders that they already exist (so that we don't try to
221         # upload into them a second time)
222
223         already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
224         self.failUnlessEqual(already, set([2,3,4]))
225         self.failUnlessEqual(set(writers.keys()), set([5]))
226
227     def test_sizelimits(self):
228         ss = self.create("test_sizelimits", 100)
229         canary = Referenceable()
230         
231         already,writers = self.allocate(ss, "vid1", [0,1,2], 25)
232         self.failUnlessEqual(len(writers), 3)
233         # now the StorageServer should have 75 bytes provisionally allocated,
234         # allowing only 25 more to be claimed
235         self.failUnlessEqual(len(ss._active_writers), 3)
236
237         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 25)
238         self.failUnlessEqual(len(writers2), 1)
239         self.failUnlessEqual(len(ss._active_writers), 4)
240
241         # we abandon the first set, so their provisional allocation should be
242         # returned
243         del already
244         del writers
245         self.failUnlessEqual(len(ss._active_writers), 1)
246
247         # and we close the second set, so their provisional allocation should
248         # become real, long-term allocation
249         for bw in writers2.values():
250             bw.remote_write(0, "a"*25)
251             bw.remote_close()
252         del already2
253         del writers2
254         del bw
255         self.failUnlessEqual(len(ss._active_writers), 0)
256
257         # now there should be 25 bytes allocated, and 75 free
258         already3,writers3 = self.allocate(ss,"vid3", [0,1,2,3], 25)
259         self.failUnlessEqual(len(writers3), 3)
260         self.failUnlessEqual(len(ss._active_writers), 3)
261
262         del already3
263         del writers3
264         self.failUnlessEqual(len(ss._active_writers), 0)
265         ss.disownServiceParent()
266         del ss
267
268         # creating a new StorageServer in the same directory should see the
269         # same usage. note that metadata will be counted at startup but not
270         # during runtime, so if we were creating any metadata, the allocation
271         # would be more than 25 bytes and this test would need to be changed.
272         ss = self.create("test_sizelimits", 100)
273         already4,writers4 = self.allocate(ss, "vid4", [0,1,2,3], 25)
274         self.failUnlessEqual(len(writers4), 3)
275         self.failUnlessEqual(len(ss._active_writers), 3)
276
277     def test_leases(self):
278         ss = self.create("test_leases")
279         canary = Referenceable()
280         sharenums = range(5)
281         size = 100
282
283         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
284                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
285         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
286                                                      sharenums, size, canary)
287         self.failUnlessEqual(len(already), 0)
288         self.failUnlessEqual(len(writers), 5)
289         for wb in writers.values():
290             wb.remote_close()
291
292         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
293                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
294         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
295                                                      sharenums, size, canary)
296         for wb in writers.values():
297             wb.remote_close()
298
299         # take out a second lease on si1
300         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
301                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
302         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
303                                                      sharenums, size, canary)
304         self.failUnlessEqual(len(already), 5)
305         self.failUnlessEqual(len(writers), 0)
306
307         # check that si0 is readable
308         readers = ss.remote_get_buckets("si0")
309         self.failUnlessEqual(len(readers), 5)
310
311         # renew the first lease. Only the proper renew_secret should work
312         ss.remote_renew_lease("si0", rs0)
313         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
314         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
315
316         # check that si0 is still readable
317         readers = ss.remote_get_buckets("si0")
318         self.failUnlessEqual(len(readers), 5)
319
320         # now cancel it
321         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
322         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
323         ss.remote_cancel_lease("si0", cs0)
324
325         # si0 should now be gone
326         readers = ss.remote_get_buckets("si0")
327         self.failUnlessEqual(len(readers), 0)
328         # and the renew should no longer work
329         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
330
331
332         # cancel the first lease on si1, leaving the second in place
333         ss.remote_cancel_lease("si1", cs1)
334         readers = ss.remote_get_buckets("si1")
335         self.failUnlessEqual(len(readers), 5)
336         # the corresponding renew should no longer work
337         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
338
339         ss.remote_renew_lease("si1", rs2)
340         # cancelling the second should make it go away
341         ss.remote_cancel_lease("si1", cs2)
342         readers = ss.remote_get_buckets("si1")
343         self.failUnlessEqual(len(readers), 0)
344         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
345         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
346