]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
storage: remove the leftover incoming/XYZ/ directory when we're done with it
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 from twisted.trial import unittest
3
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable
7 import time, os.path, stat
8 import itertools
9 from allmydata import interfaces
10 from allmydata.util import fileutil, hashutil
11 from allmydata.storage import BucketWriter, BucketReader, \
12      WriteBucketProxy, ReadBucketProxy, StorageServer
13
14 class Bucket(unittest.TestCase):
15     def make_workdir(self, name):
16         basedir = os.path.join("storage", "Bucket", name)
17         incoming = os.path.join(basedir, "tmp", "bucket")
18         final = os.path.join(basedir, "bucket")
19         fileutil.make_dirs(basedir)
20         fileutil.make_dirs(os.path.join(basedir, "tmp"))
21         return incoming, final
22
23     def bucket_writer_closed(self, bw, consumed):
24         pass
25
26     def make_lease(self):
27         owner_num = 0
28         renew_secret = os.urandom(32)
29         cancel_secret = os.urandom(32)
30         expiration_time = time.time() + 5000
31         return (owner_num, renew_secret, cancel_secret, expiration_time)
32
33     def test_create(self):
34         incoming, final = self.make_workdir("test_create")
35         bw = BucketWriter(self, incoming, final, 200, self.make_lease())
36         bw.remote_write(0, "a"*25)
37         bw.remote_write(25, "b"*25)
38         bw.remote_write(50, "c"*25)
39         bw.remote_write(75, "d"*7)
40         bw.remote_close()
41
42     def test_readwrite(self):
43         incoming, final = self.make_workdir("test_readwrite")
44         bw = BucketWriter(self, incoming, final, 200, self.make_lease())
45         bw.remote_write(0, "a"*25)
46         bw.remote_write(25, "b"*25)
47         bw.remote_write(50, "c"*7) # last block may be short
48         bw.remote_close()
49
50         # now read from it
51         br = BucketReader(final)
52         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
53         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
54         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
55
56 class RemoteBucket:
57
58     def callRemote(self, methname, *args, **kwargs):
59         def _call():
60             meth = getattr(self.target, "remote_" + methname)
61             return meth(*args, **kwargs)
62         return defer.maybeDeferred(_call)
63
64 class BucketProxy(unittest.TestCase):
65     def make_bucket(self, name, size):
66         basedir = os.path.join("storage", "BucketProxy", name)
67         incoming = os.path.join(basedir, "tmp", "bucket")
68         final = os.path.join(basedir, "bucket")
69         fileutil.make_dirs(basedir)
70         fileutil.make_dirs(os.path.join(basedir, "tmp"))
71         bw = BucketWriter(self, incoming, final, size, self.make_lease())
72         rb = RemoteBucket()
73         rb.target = bw
74         return bw, rb, final
75
76     def make_lease(self):
77         owner_num = 0
78         renew_secret = os.urandom(32)
79         cancel_secret = os.urandom(32)
80         expiration_time = time.time() + 5000
81         return (owner_num, renew_secret, cancel_secret, expiration_time)
82
83     def bucket_writer_closed(self, bw, consumed):
84         pass
85
86     def test_create(self):
87         bw, rb, final = self.make_bucket("test_create", 500)
88         bp = WriteBucketProxy(rb,
89                               data_size=300,
90                               segment_size=10,
91                               num_segments=5,
92                               num_share_hashes=3,
93                               uri_extension_size=500)
94         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
95
96     def test_readwrite(self):
97         # Let's pretend each share has 100 bytes of data, and that there are
98         # 4 segments (25 bytes each), and 8 shares total. So the three
99         # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
100         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
101         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
102         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
103         # long. That should make the whole share:
104         #
105         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
106
107         plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
108                             for i in range(7)]
109         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
110                             for i in range(7)]
111         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
112                         for i in range(7)]
113         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
114                         for i in (1,9,13)]
115         uri_extension = "s" + "E"*498 + "e"
116
117         bw, rb, final = self.make_bucket("test_readwrite", 1414)
118         bp = WriteBucketProxy(rb,
119                               data_size=95,
120                               segment_size=25,
121                               num_segments=4,
122                               num_share_hashes=3,
123                               uri_extension_size=len(uri_extension))
124
125         d = bp.start()
126         d.addCallback(lambda res: bp.put_block(0, "a"*25))
127         d.addCallback(lambda res: bp.put_block(1, "b"*25))
128         d.addCallback(lambda res: bp.put_block(2, "c"*25))
129         d.addCallback(lambda res: bp.put_block(3, "d"*20))
130         d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
131         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
132         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
133         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
134         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
135         d.addCallback(lambda res: bp.close())
136
137         # now read everything back
138         def _start_reading(res):
139             br = BucketReader(final)
140             rb = RemoteBucket()
141             rb.target = br
142             rbp = ReadBucketProxy(rb)
143             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
144
145             d1 = rbp.startIfNecessary()
146             d1.addCallback(lambda res: rbp.get_block(0))
147             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
148             d1.addCallback(lambda res: rbp.get_block(1))
149             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
150             d1.addCallback(lambda res: rbp.get_block(2))
151             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
152             d1.addCallback(lambda res: rbp.get_block(3))
153             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
154
155             d1.addCallback(lambda res: rbp.get_plaintext_hashes())
156             d1.addCallback(lambda res:
157                            self.failUnlessEqual(res, plaintext_hashes))
158             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
159             d1.addCallback(lambda res:
160                            self.failUnlessEqual(res, crypttext_hashes))
161             d1.addCallback(lambda res: rbp.get_block_hashes())
162             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
163             d1.addCallback(lambda res: rbp.get_share_hashes())
164             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
165             d1.addCallback(lambda res: rbp.get_uri_extension())
166             d1.addCallback(lambda res:
167                            self.failUnlessEqual(res, uri_extension))
168
169             return d1
170
171         d.addCallback(_start_reading)
172
173         return d
174
175
176
177 class Server(unittest.TestCase):
178
179     def setUp(self):
180         self.sparent = service.MultiService()
181         self._secret = itertools.count()
182     def tearDown(self):
183         return self.sparent.stopService()
184
185     def workdir(self, name):
186         basedir = os.path.join("storage", "Server", name)
187         return basedir
188
189     def create(self, name, sizelimit=None):
190         workdir = self.workdir(name)
191         ss = StorageServer(workdir, sizelimit)
192         ss.setServiceParent(self.sparent)
193         return ss
194
195     def test_create(self):
196         ss = self.create("test_create")
197
198     def allocate(self, ss, storage_index, sharenums, size):
199         renew_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
200         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
201         return ss.remote_allocate_buckets(storage_index,
202                                           renew_secret, cancel_secret,
203                                           sharenums, size, Referenceable())
204
205     def test_remove_incoming(self):
206         ss = self.create("test_remove_incoming")
207         already, writers = self.allocate(ss, "vid", range(3), 10)
208         for i,wb in writers.items():
209             wb.remote_write(0, "%10d" % i)
210             wb.remote_close()
211         incomingdir = os.path.join(self.workdir("test_remove_incoming"),
212                                    "shares", "incoming")
213         leftover_dirs = os.listdir(incomingdir)
214         self.failUnlessEqual(leftover_dirs, [])
215
216     def test_allocate(self):
217         ss = self.create("test_allocate")
218
219         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
220
221         canary = Referenceable()
222         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
223         self.failUnlessEqual(already, set())
224         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
225
226         # while the buckets are open, they should not count as readable
227         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
228
229         for i,wb in writers.items():
230             wb.remote_write(0, "%25d" % i)
231             wb.remote_close()
232
233         # now they should be readable
234         b = ss.remote_get_buckets("vid")
235         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
236         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
237
238         # now if we about writing again, the server should offer those three
239         # buckets as already present
240         already,writers = self.allocate(ss, "vid", [0,1,2,3,4], 75)
241         self.failUnlessEqual(already, set([0,1,2]))
242         self.failUnlessEqual(set(writers.keys()), set([3,4]))
243
244         # while those two buckets are open for writing, the server should
245         # tell new uploaders that they already exist (so that we don't try to
246         # upload into them a second time)
247
248         already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
249         self.failUnlessEqual(already, set([2,3,4]))
250         self.failUnlessEqual(set(writers.keys()), set([5]))
251
252     def test_sizelimits(self):
253         ss = self.create("test_sizelimits", 5000)
254         canary = Referenceable()
255         # a newly created and filled share incurs this much overhead, beyond
256         # the size we request.
257         OVERHEAD = 3*4
258         LEASE_SIZE = 4+32+32+4
259
260         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
261         self.failUnlessEqual(len(writers), 3)
262         # now the StorageServer should have 3000 bytes provisionally
263         # allocated, allowing only 2000 more to be claimed
264         self.failUnlessEqual(len(ss._active_writers), 3)
265
266         # allocating 1001-byte shares only leaves room for one
267         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
268         self.failUnlessEqual(len(writers2), 1)
269         self.failUnlessEqual(len(ss._active_writers), 4)
270
271         # we abandon the first set, so their provisional allocation should be
272         # returned
273         del already
274         del writers
275         self.failUnlessEqual(len(ss._active_writers), 1)
276         # now we have a provisional allocation of 1001 bytes
277
278         # and we close the second set, so their provisional allocation should
279         # become real, long-term allocation, and grows to include the
280         # overhead.
281         for bw in writers2.values():
282             bw.remote_write(0, "a"*25)
283             bw.remote_close()
284         del already2
285         del writers2
286         del bw
287         self.failUnlessEqual(len(ss._active_writers), 0)
288
289         allocated = 1001 + OVERHEAD + LEASE_SIZE
290         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
291         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
292         already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
293         self.failUnlessEqual(len(writers3), 39)
294         self.failUnlessEqual(len(ss._active_writers), 39)
295
296         del already3
297         del writers3
298         self.failUnlessEqual(len(ss._active_writers), 0)
299         ss.disownServiceParent()
300         del ss
301
302         # creating a new StorageServer in the same directory should see the
303         # same usage.
304
305         # metadata that goes into the share file is counted upon share close,
306         # as well as at startup. metadata that goes into other files will not
307         # be counted until the next startup, so if we were creating any
308         # extra-file metadata, the allocation would be more than 'allocated'
309         # and this test would need to be changed.
310         ss = self.create("test_sizelimits", 5000)
311         already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
312         self.failUnlessEqual(len(writers4), 39)
313         self.failUnlessEqual(len(ss._active_writers), 39)
314
315     def test_seek(self):
316         basedir = self.workdir("test_seek_behavior")
317         fileutil.make_dirs(basedir)
318         filename = os.path.join(basedir, "testfile")
319         f = open(filename, "wb")
320         f.write("start")
321         f.close()
322         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
323         # files. mode="a" preserves previous contents but does not allow
324         # seeking-to-create-holes. mode="r+" allows both.
325         f = open(filename, "rb+")
326         f.seek(100)
327         f.write("100")
328         f.close()
329         filelen = os.stat(filename)[stat.ST_SIZE]
330         self.failUnlessEqual(filelen, 100+3)
331         f2 = open(filename, "rb")
332         self.failUnlessEqual(f2.read(5), "start")
333
334
335     def test_leases(self):
336         ss = self.create("test_leases")
337         canary = Referenceable()
338         sharenums = range(5)
339         size = 100
340
341         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
342                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
343         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
344                                                      sharenums, size, canary)
345         self.failUnlessEqual(len(already), 0)
346         self.failUnlessEqual(len(writers), 5)
347         for wb in writers.values():
348             wb.remote_close()
349
350         leases = list(ss.get_leases("si0"))
351         self.failUnlessEqual(len(leases), 1)
352         self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
353
354         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
355                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
356         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
357                                                      sharenums, size, canary)
358         for wb in writers.values():
359             wb.remote_close()
360
361         # take out a second lease on si1
362         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
363                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
364         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
365                                                      sharenums, size, canary)
366         self.failUnlessEqual(len(already), 5)
367         self.failUnlessEqual(len(writers), 0)
368
369         leases = list(ss.get_leases("si1"))
370         self.failUnlessEqual(len(leases), 2)
371         self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
372
373         # check that si0 is readable
374         readers = ss.remote_get_buckets("si0")
375         self.failUnlessEqual(len(readers), 5)
376
377         # renew the first lease. Only the proper renew_secret should work
378         ss.remote_renew_lease("si0", rs0)
379         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
380         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
381
382         # check that si0 is still readable
383         readers = ss.remote_get_buckets("si0")
384         self.failUnlessEqual(len(readers), 5)
385
386         # now cancel it
387         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
388         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
389         ss.remote_cancel_lease("si0", cs0)
390
391         # si0 should now be gone
392         readers = ss.remote_get_buckets("si0")
393         self.failUnlessEqual(len(readers), 0)
394         # and the renew should no longer work
395         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
396
397
398         # cancel the first lease on si1, leaving the second in place
399         ss.remote_cancel_lease("si1", cs1)
400         readers = ss.remote_get_buckets("si1")
401         self.failUnlessEqual(len(readers), 5)
402         # the corresponding renew should no longer work
403         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
404
405         leases = list(ss.get_leases("si1"))
406         self.failUnlessEqual(len(leases), 1)
407         self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
408
409         ss.remote_renew_lease("si1", rs2)
410         # cancelling the second should make it go away
411         ss.remote_cancel_lease("si1", cs2)
412         readers = ss.remote_get_buckets("si1")
413         self.failUnlessEqual(len(readers), 0)
414         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
415         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
416
417         leases = list(ss.get_leases("si1"))
418         self.failUnlessEqual(len(leases), 0)
419
420
421         # test overlapping uploads
422         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
423                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
424         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
425                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
426         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
427                                                      sharenums, size, canary)
428         self.failUnlessEqual(len(already), 0)
429         self.failUnlessEqual(len(writers), 5)
430         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
431                                                        sharenums, size, canary)
432         self.failUnlessEqual(len(already2), 5)
433         self.failUnlessEqual(len(writers2), 0)
434         for wb in writers.values():
435             wb.remote_close()
436
437         leases = list(ss.get_leases("si3"))
438         self.failUnlessEqual(len(leases), 2)
439