]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
test_storage.py: improve test coverage
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 from twisted.trial import unittest
3
4 from twisted.internet import defer
5 import time, os.path, stat
6 import itertools
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil
9 from allmydata.storage import BucketWriter, BucketReader, \
10      WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
11      storage_index_to_dir, DataTooLargeError
12 from allmydata.interfaces import BadWriteEnablerError
13 from allmydata.test.common import LoggingServiceParent
14
15 class Marker:
16     pass
17 class FakeCanary:
18     def __init__(self, ignore_disconnectors=False):
19         self.ignore = ignore_disconnectors
20         self.disconnectors = {}
21     def notifyOnDisconnect(self, f, *args, **kwargs):
22         if self.ignore:
23             return
24         m = Marker()
25         self.disconnectors[m] = (f, args, kwargs)
26         return m
27     def dontNotifyOnDisconnect(self, marker):
28         if self.ignore:
29             return
30         del self.disconnectors[marker]
31
32 class FakeStatsProvider:
33     def count(self, name, delta=1):
34         pass
35     def register_producer(self, producer):
36         pass
37
38 class Bucket(unittest.TestCase):
39     def make_workdir(self, name):
40         basedir = os.path.join("storage", "Bucket", name)
41         incoming = os.path.join(basedir, "tmp", "bucket")
42         final = os.path.join(basedir, "bucket")
43         fileutil.make_dirs(basedir)
44         fileutil.make_dirs(os.path.join(basedir, "tmp"))
45         return incoming, final
46
47     def bucket_writer_closed(self, bw, consumed):
48         pass
49     def add_latency(self, category, latency):
50         pass
51     def count(self, name, delta=1):
52         pass
53
54     def make_lease(self):
55         owner_num = 0
56         renew_secret = os.urandom(32)
57         cancel_secret = os.urandom(32)
58         expiration_time = time.time() + 5000
59         return (owner_num, renew_secret, cancel_secret, expiration_time)
60
61     def test_create(self):
62         incoming, final = self.make_workdir("test_create")
63         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
64                           FakeCanary())
65         bw.remote_write(0, "a"*25)
66         bw.remote_write(25, "b"*25)
67         bw.remote_write(50, "c"*25)
68         bw.remote_write(75, "d"*7)
69         bw.remote_close()
70
71     def test_readwrite(self):
72         incoming, final = self.make_workdir("test_readwrite")
73         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
74                           FakeCanary())
75         bw.remote_write(0, "a"*25)
76         bw.remote_write(25, "b"*25)
77         bw.remote_write(50, "c"*7) # last block may be short
78         bw.remote_close()
79
80         # now read from it
81         br = BucketReader(self, bw.finalhome)
82         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
83         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
84         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
85
86 class RemoteBucket:
87
88     def callRemote(self, methname, *args, **kwargs):
89         def _call():
90             meth = getattr(self.target, "remote_" + methname)
91             return meth(*args, **kwargs)
92         return defer.maybeDeferred(_call)
93
94 class BucketProxy(unittest.TestCase):
95     def make_bucket(self, name, size):
96         basedir = os.path.join("storage", "BucketProxy", name)
97         incoming = os.path.join(basedir, "tmp", "bucket")
98         final = os.path.join(basedir, "bucket")
99         fileutil.make_dirs(basedir)
100         fileutil.make_dirs(os.path.join(basedir, "tmp"))
101         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
102                           FakeCanary())
103         rb = RemoteBucket()
104         rb.target = bw
105         return bw, rb, final
106
107     def make_lease(self):
108         owner_num = 0
109         renew_secret = os.urandom(32)
110         cancel_secret = os.urandom(32)
111         expiration_time = time.time() + 5000
112         return (owner_num, renew_secret, cancel_secret, expiration_time)
113
114     def bucket_writer_closed(self, bw, consumed):
115         pass
116     def add_latency(self, category, latency):
117         pass
118     def count(self, name, delta=1):
119         pass
120
121     def test_create(self):
122         bw, rb, sharefname = self.make_bucket("test_create", 500)
123         bp = WriteBucketProxy(rb,
124                               data_size=300,
125                               segment_size=10,
126                               num_segments=5,
127                               num_share_hashes=3,
128                               uri_extension_size=500, nodeid=None)
129         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
130
131     def test_readwrite(self):
132         # Let's pretend each share has 100 bytes of data, and that there are
133         # 4 segments (25 bytes each), and 8 shares total. So the three
134         # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
135         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
136         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
137         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
138         # long. That should make the whole share:
139         #
140         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
141
142         plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
143                             for i in range(7)]
144         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
145                             for i in range(7)]
146         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
147                         for i in range(7)]
148         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
149                         for i in (1,9,13)]
150         uri_extension = "s" + "E"*498 + "e"
151
152         bw, rb, sharefname = self.make_bucket("test_readwrite", 1414)
153         bp = WriteBucketProxy(rb,
154                               data_size=95,
155                               segment_size=25,
156                               num_segments=4,
157                               num_share_hashes=3,
158                               uri_extension_size=len(uri_extension),
159                               nodeid=None)
160
161         d = bp.start()
162         d.addCallback(lambda res: bp.put_block(0, "a"*25))
163         d.addCallback(lambda res: bp.put_block(1, "b"*25))
164         d.addCallback(lambda res: bp.put_block(2, "c"*25))
165         d.addCallback(lambda res: bp.put_block(3, "d"*20))
166         d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
167         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
168         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
169         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
170         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
171         d.addCallback(lambda res: bp.close())
172
173         # now read everything back
174         def _start_reading(res):
175             br = BucketReader(self, sharefname)
176             rb = RemoteBucket()
177             rb.target = br
178             rbp = ReadBucketProxy(rb, peerid="abc")
179             self.failUnless("to peer" in repr(rbp))
180             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
181
182             d1 = rbp.startIfNecessary()
183             d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
184             d1.addCallback(lambda res: rbp.get_block(0))
185             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
186             d1.addCallback(lambda res: rbp.get_block(1))
187             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
188             d1.addCallback(lambda res: rbp.get_block(2))
189             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
190             d1.addCallback(lambda res: rbp.get_block(3))
191             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
192
193             d1.addCallback(lambda res: rbp.get_plaintext_hashes())
194             d1.addCallback(lambda res:
195                            self.failUnlessEqual(res, plaintext_hashes))
196             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
197             d1.addCallback(lambda res:
198                            self.failUnlessEqual(res, crypttext_hashes))
199             d1.addCallback(lambda res: rbp.get_block_hashes())
200             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
201             d1.addCallback(lambda res: rbp.get_share_hashes())
202             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
203             d1.addCallback(lambda res: rbp.get_uri_extension())
204             d1.addCallback(lambda res:
205                            self.failUnlessEqual(res, uri_extension))
206
207             return d1
208
209         d.addCallback(_start_reading)
210
211         return d
212
213
214
215 class Server(unittest.TestCase):
216
217     def setUp(self):
218         self.sparent = LoggingServiceParent()
219         self._lease_secret = itertools.count()
220     def tearDown(self):
221         return self.sparent.stopService()
222
223     def workdir(self, name):
224         basedir = os.path.join("storage", "Server", name)
225         return basedir
226
227     def create(self, name, sizelimit=None):
228         workdir = self.workdir(name)
229         ss = StorageServer(workdir, sizelimit,
230                            stats_provider=FakeStatsProvider())
231         ss.setServiceParent(self.sparent)
232         return ss
233
234     def test_create(self):
235         ss = self.create("test_create")
236
237     def allocate(self, ss, storage_index, sharenums, size, canary=None):
238         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
239         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
240         if not canary:
241             canary = FakeCanary()
242         return ss.remote_allocate_buckets(storage_index,
243                                           renew_secret, cancel_secret,
244                                           sharenums, size, canary)
245
246     def test_dont_overfill_dirs(self):
247         """
248         This test asserts that if you add a second share whose storage index
249         share lots of leading bits with an extant share (but isn't the exact
250         same storage index), this won't add an entry to the share directory.
251         """
252         ss = self.create("test_dont_overfill_dirs")
253         already, writers = self.allocate(ss, "storageindex", [0], 10)
254         for i, wb in writers.items():
255             wb.remote_write(0, "%10d" % i)
256             wb.remote_close()
257         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
258                                 "shares")
259         children_of_storedir = set(os.listdir(storedir))
260
261         # Now store another one under another storageindex that has leading
262         # chars the same as the first storageindex.
263         already, writers = self.allocate(ss, "storageindey", [0], 10)
264         for i, wb in writers.items():
265             wb.remote_write(0, "%10d" % i)
266             wb.remote_close()
267         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
268                                 "shares")
269         new_children_of_storedir = set(os.listdir(storedir))
270         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
271
272     def test_remove_incoming(self):
273         ss = self.create("test_remove_incoming")
274         already, writers = self.allocate(ss, "vid", range(3), 10)
275         for i,wb in writers.items():
276             wb.remote_write(0, "%10d" % i)
277             wb.remote_close()
278         incomingdir = os.path.dirname(os.path.dirname(os.path.dirname(wb.incominghome)))
279         self.failIf(os.path.exists(incomingdir))
280
281     def test_allocate(self):
282         ss = self.create("test_allocate")
283
284         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
285
286         canary = FakeCanary()
287         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
288         self.failUnlessEqual(already, set())
289         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
290
291         # while the buckets are open, they should not count as readable
292         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
293
294         # close the buckets
295         for i,wb in writers.items():
296             wb.remote_write(0, "%25d" % i)
297             wb.remote_close()
298             # aborting a bucket that was already closed is a no-op
299             wb.remote_abort()
300
301         # now they should be readable
302         b = ss.remote_get_buckets("allocate")
303         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
304         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
305
306         # now if we ask about writing again, the server should offer those
307         # three buckets as already present. It should offer them even if we
308         # don't ask about those specific ones.
309         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
310         self.failUnlessEqual(already, set([0,1,2]))
311         self.failUnlessEqual(set(writers.keys()), set([3,4]))
312
313         # while those two buckets are open for writing, the server should
314         # refuse to offer them to uploaders
315
316         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
317         self.failUnlessEqual(already2, set([0,1,2]))
318         self.failUnlessEqual(set(writers2.keys()), set([5]))
319
320         # aborting the writes should remove the tempfiles
321         for i,wb in writers2.items():
322             wb.remote_abort()
323         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
324         self.failUnlessEqual(already2, set([0,1,2]))
325         self.failUnlessEqual(set(writers2.keys()), set([5]))
326
327         for i,wb in writers2.items():
328             wb.remote_abort()
329         for i,wb in writers.items():
330             wb.remote_abort()
331
332     def test_disconnect(self):
333         # simulate a disconnection
334         ss = self.create("test_disconnect")
335         canary = FakeCanary()
336         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
337         self.failUnlessEqual(already, set())
338         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
339         for (f,args,kwargs) in canary.disconnectors.values():
340             f(*args, **kwargs)
341         del already
342         del writers
343
344         # that ought to delete the incoming shares
345         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
346         self.failUnlessEqual(already, set())
347         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
348
349     def test_sizelimits(self):
350         ss = self.create("test_sizelimits", 5000)
351         # a newly created and filled share incurs this much overhead, beyond
352         # the size we request.
353         OVERHEAD = 3*4
354         LEASE_SIZE = 4+32+32+4
355         canary = FakeCanary(True)
356         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
357         self.failUnlessEqual(len(writers), 3)
358         # now the StorageServer should have 3000 bytes provisionally
359         # allocated, allowing only 2000 more to be claimed
360         self.failUnlessEqual(len(ss._active_writers), 3)
361
362         # allocating 1001-byte shares only leaves room for one
363         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
364         self.failUnlessEqual(len(writers2), 1)
365         self.failUnlessEqual(len(ss._active_writers), 4)
366
367         # we abandon the first set, so their provisional allocation should be
368         # returned
369         del already
370         del writers
371         self.failUnlessEqual(len(ss._active_writers), 1)
372         # now we have a provisional allocation of 1001 bytes
373
374         # and we close the second set, so their provisional allocation should
375         # become real, long-term allocation, and grows to include the
376         # overhead.
377         for bw in writers2.values():
378             bw.remote_write(0, "a"*25)
379             bw.remote_close()
380         del already2
381         del writers2
382         del bw
383         self.failUnlessEqual(len(ss._active_writers), 0)
384
385         allocated = 1001 + OVERHEAD + LEASE_SIZE
386         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
387         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
388         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
389         self.failUnlessEqual(len(writers3), 39)
390         self.failUnlessEqual(len(ss._active_writers), 39)
391
392         del already3
393         del writers3
394         self.failUnlessEqual(len(ss._active_writers), 0)
395         ss.disownServiceParent()
396         del ss
397
398         # creating a new StorageServer in the same directory should see the
399         # same usage.
400
401         # metadata that goes into the share file is counted upon share close,
402         # as well as at startup. metadata that goes into other files will not
403         # be counted until the next startup, so if we were creating any
404         # extra-file metadata, the allocation would be more than 'allocated'
405         # and this test would need to be changed.
406         ss = self.create("test_sizelimits", 5000)
407         already4,writers4 = self.allocate(ss, "vid4", range(100), 100, canary)
408         self.failUnlessEqual(len(writers4), 39)
409         self.failUnlessEqual(len(ss._active_writers), 39)
410
411     def test_seek(self):
412         basedir = self.workdir("test_seek_behavior")
413         fileutil.make_dirs(basedir)
414         filename = os.path.join(basedir, "testfile")
415         f = open(filename, "wb")
416         f.write("start")
417         f.close()
418         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
419         # files. mode="a" preserves previous contents but does not allow
420         # seeking-to-create-holes. mode="r+" allows both.
421         f = open(filename, "rb+")
422         f.seek(100)
423         f.write("100")
424         f.close()
425         filelen = os.stat(filename)[stat.ST_SIZE]
426         self.failUnlessEqual(filelen, 100+3)
427         f2 = open(filename, "rb")
428         self.failUnlessEqual(f2.read(5), "start")
429
430
431     def test_leases(self):
432         ss = self.create("test_leases")
433         canary = FakeCanary()
434         sharenums = range(5)
435         size = 100
436
437         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
438                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
439         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
440                                                      sharenums, size, canary)
441         self.failUnlessEqual(len(already), 0)
442         self.failUnlessEqual(len(writers), 5)
443         for wb in writers.values():
444             wb.remote_close()
445
446         leases = list(ss.get_leases("si0"))
447         self.failUnlessEqual(len(leases), 1)
448         self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
449
450         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
451                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
452         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
453                                                      sharenums, size, canary)
454         for wb in writers.values():
455             wb.remote_close()
456
457         # take out a second lease on si1
458         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
459                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
460         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
461                                                      sharenums, size, canary)
462         self.failUnlessEqual(len(already), 5)
463         self.failUnlessEqual(len(writers), 0)
464
465         leases = list(ss.get_leases("si1"))
466         self.failUnlessEqual(len(leases), 2)
467         self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
468
469         # check that si0 is readable
470         readers = ss.remote_get_buckets("si0")
471         self.failUnlessEqual(len(readers), 5)
472
473         # renew the first lease. Only the proper renew_secret should work
474         ss.remote_renew_lease("si0", rs0)
475         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
476         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
477
478         # check that si0 is still readable
479         readers = ss.remote_get_buckets("si0")
480         self.failUnlessEqual(len(readers), 5)
481
482         # now cancel it
483         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
484         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
485         ss.remote_cancel_lease("si0", cs0)
486
487         # si0 should now be gone
488         readers = ss.remote_get_buckets("si0")
489         self.failUnlessEqual(len(readers), 0)
490         # and the renew should no longer work
491         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
492
493
494         # cancel the first lease on si1, leaving the second in place
495         ss.remote_cancel_lease("si1", cs1)
496         readers = ss.remote_get_buckets("si1")
497         self.failUnlessEqual(len(readers), 5)
498         # the corresponding renew should no longer work
499         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
500
501         leases = list(ss.get_leases("si1"))
502         self.failUnlessEqual(len(leases), 1)
503         self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
504
505         ss.remote_renew_lease("si1", rs2)
506         # cancelling the second should make it go away
507         ss.remote_cancel_lease("si1", cs2)
508         readers = ss.remote_get_buckets("si1")
509         self.failUnlessEqual(len(readers), 0)
510         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
511         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
512
513         leases = list(ss.get_leases("si1"))
514         self.failUnlessEqual(len(leases), 0)
515
516
517         # test overlapping uploads
518         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
519                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
520         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
521                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
522         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
523                                                      sharenums, size, canary)
524         self.failUnlessEqual(len(already), 0)
525         self.failUnlessEqual(len(writers), 5)
526         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
527                                                        sharenums, size, canary)
528         self.failUnlessEqual(len(already2), 0)
529         self.failUnlessEqual(len(writers2), 0)
530         for wb in writers.values():
531             wb.remote_close()
532
533         leases = list(ss.get_leases("si3"))
534         self.failUnlessEqual(len(leases), 1)
535
536         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
537                                                        sharenums, size, canary)
538         self.failUnlessEqual(len(already3), 5)
539         self.failUnlessEqual(len(writers3), 0)
540
541         leases = list(ss.get_leases("si3"))
542         self.failUnlessEqual(len(leases), 2)
543
544     def test_readonly(self):
545         workdir = self.workdir("test_readonly")
546         ss = StorageServer(workdir, readonly_storage=True)
547         ss.setServiceParent(self.sparent)
548
549         canary = FakeCanary()
550         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
551         self.failUnlessEqual(already, set())
552         self.failUnlessEqual(writers, {})
553
554     def test_discard(self):
555         # discard is really only used for other tests, but we test it anyways
556         workdir = self.workdir("test_discard")
557         ss = StorageServer(workdir, discard_storage=True)
558         ss.setServiceParent(self.sparent)
559
560         canary = FakeCanary()
561         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
562         self.failUnlessEqual(already, set())
563         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
564         for i,wb in writers.items():
565             wb.remote_write(0, "%25d" % i)
566             wb.remote_close()
567         # since we discard the data, the shares should be present but sparse.
568         # Since we write with some seeks, the data we read back will be all
569         # zeros.
570         b = ss.remote_get_buckets("vid")
571         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
572         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
573
574
575
576 class MutableServer(unittest.TestCase):
577
578     def setUp(self):
579         self.sparent = LoggingServiceParent()
580         self._lease_secret = itertools.count()
581     def tearDown(self):
582         return self.sparent.stopService()
583
584     def workdir(self, name):
585         basedir = os.path.join("storage", "MutableServer", name)
586         return basedir
587
588     def create(self, name, sizelimit=None):
589         workdir = self.workdir(name)
590         ss = StorageServer(workdir, sizelimit)
591         ss.setServiceParent(self.sparent)
592         ss.setNodeID("\x00" * 32)
593         return ss
594
595     def test_create(self):
596         ss = self.create("test_create")
597
598     def write_enabler(self, we_tag):
599         return hashutil.tagged_hash("we_blah", we_tag)
600
601     def renew_secret(self, tag):
602         return hashutil.tagged_hash("renew_blah", str(tag))
603
604     def cancel_secret(self, tag):
605         return hashutil.tagged_hash("cancel_blah", str(tag))
606
607     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
608         write_enabler = self.write_enabler(we_tag)
609         renew_secret = self.renew_secret(lease_tag)
610         cancel_secret = self.cancel_secret(lease_tag)
611         rstaraw = ss.remote_slot_testv_and_readv_and_writev
612         testandwritev = dict( [ (shnum, ([], [], None) )
613                          for shnum in sharenums ] )
614         readv = []
615         rc = rstaraw(storage_index,
616                      (write_enabler, renew_secret, cancel_secret),
617                      testandwritev,
618                      readv)
619         (did_write, readv_data) = rc
620         self.failUnless(did_write)
621         self.failUnless(isinstance(readv_data, dict))
622         self.failUnlessEqual(len(readv_data), 0)
623
624     def test_container_size(self):
625         ss = self.create("test_container_size")
626         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
627                       set([0,1,2]), 100)
628         rstaraw = ss.remote_slot_testv_and_readv_and_writev
629         secrets = ( self.write_enabler("we1"),
630                     self.renew_secret("we1"),
631                     self.cancel_secret("we1") )
632         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
633         answer = rstaraw("si1", secrets,
634                          {0: ([], [(0,data)], len(data)+12)},
635                          [])
636         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
637
638         # trying to make the container too large will raise an exception
639         TOOBIG = MutableShareFile.MAX_SIZE + 10
640         self.failUnlessRaises(DataTooLargeError,
641                               rstaraw, "si1", secrets,
642                               {0: ([], [(0,data)], TOOBIG)},
643                               [])
644
645         # it should be possible to make the container smaller, although at
646         # the moment this doesn't actually affect the share
647         answer = rstaraw("si1", secrets,
648                          {0: ([], [(0,data)], len(data)+8)},
649                          [])
650         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
651
652     def test_allocate(self):
653         ss = self.create("test_allocate")
654         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
655                       set([0,1,2]), 100)
656
657         read = ss.remote_slot_readv
658         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
659                              {0: [""]})
660         self.failUnlessEqual(read("si1", [], [(0, 10)]),
661                              {0: [""], 1: [""], 2: [""]})
662         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
663                              {0: [""]})
664
665         # try writing to one
666         secrets = ( self.write_enabler("we1"),
667                     self.renew_secret("we1"),
668                     self.cancel_secret("we1") )
669         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
670         write = ss.remote_slot_testv_and_readv_and_writev
671         answer = write("si1", secrets,
672                        {0: ([], [(0,data)], None)},
673                        [])
674         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
675
676         self.failUnlessEqual(read("si1", [0], [(0,20)]),
677                              {0: ["00000000001111111111"]})
678         self.failUnlessEqual(read("si1", [0], [(95,10)]),
679                              {0: ["99999"]})
680         #self.failUnlessEqual(s0.remote_get_length(), 100)
681
682         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
683         f = self.failUnlessRaises(BadWriteEnablerError,
684                                   write, "si1", bad_secrets,
685                                   {}, [])
686         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
687
688         # this testv should fail
689         answer = write("si1", secrets,
690                        {0: ([(0, 12, "eq", "444444444444"),
691                              (20, 5, "eq", "22222"),
692                              ],
693                             [(0, "x"*100)],
694                             None),
695                         },
696                        [(0,12), (20,5)],
697                        )
698         self.failUnlessEqual(answer, (False,
699                                       {0: ["000000000011", "22222"],
700                                        1: ["", ""],
701                                        2: ["", ""],
702                                        }))
703         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
704
705         # as should this one
706         answer = write("si1", secrets,
707                        {0: ([(10, 5, "lt", "11111"),
708                              ],
709                             [(0, "x"*100)],
710                             None),
711                         },
712                        [(10,5)],
713                        )
714         self.failUnlessEqual(answer, (False,
715                                       {0: ["11111"],
716                                        1: [""],
717                                        2: [""]},
718                                       ))
719         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
720
721
722     def test_operators(self):
723         # test operators, the data we're comparing is '11111' in all cases.
724         # test both fail+pass, reset data after each one.
725         ss = self.create("test_operators")
726
727         secrets = ( self.write_enabler("we1"),
728                     self.renew_secret("we1"),
729                     self.cancel_secret("we1") )
730         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
731         write = ss.remote_slot_testv_and_readv_and_writev
732         read = ss.remote_slot_readv
733
734         def reset():
735             write("si1", secrets,
736                   {0: ([], [(0,data)], None)},
737                   [])
738
739         reset()
740
741         #  lt
742         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
743                                              ],
744                                             [(0, "x"*100)],
745                                             None,
746                                             )}, [(10,5)])
747         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
748         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
749         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
750         reset()
751
752         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
753                                              ],
754                                             [(0, "x"*100)],
755                                             None,
756                                             )}, [(10,5)])
757         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
758         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
759         reset()
760
761         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
762                                              ],
763                                             [(0, "y"*100)],
764                                             None,
765                                             )}, [(10,5)])
766         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
767         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
768         reset()
769
770         #  le
771         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
772                                              ],
773                                             [(0, "x"*100)],
774                                             None,
775                                             )}, [(10,5)])
776         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
777         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
778         reset()
779
780         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
781                                              ],
782                                             [(0, "y"*100)],
783                                             None,
784                                             )}, [(10,5)])
785         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
786         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
787         reset()
788
789         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
790                                              ],
791                                             [(0, "y"*100)],
792                                             None,
793                                             )}, [(10,5)])
794         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
795         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
796         reset()
797
798         #  eq
799         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
800                                              ],
801                                             [(0, "x"*100)],
802                                             None,
803                                             )}, [(10,5)])
804         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
805         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
806         reset()
807
808         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
809                                              ],
810                                             [(0, "y"*100)],
811                                             None,
812                                             )}, [(10,5)])
813         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
814         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
815         reset()
816
817         #  ne
818         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
819                                              ],
820                                             [(0, "x"*100)],
821                                             None,
822                                             )}, [(10,5)])
823         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
824         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
825         reset()
826
827         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
828                                              ],
829                                             [(0, "y"*100)],
830                                             None,
831                                             )}, [(10,5)])
832         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
833         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
834         reset()
835
836         #  ge
837         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
838                                              ],
839                                             [(0, "y"*100)],
840                                             None,
841                                             )}, [(10,5)])
842         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
843         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
844         reset()
845
846         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
847                                              ],
848                                             [(0, "y"*100)],
849                                             None,
850                                             )}, [(10,5)])
851         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
852         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
853         reset()
854
855         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
856                                              ],
857                                             [(0, "y"*100)],
858                                             None,
859                                             )}, [(10,5)])
860         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
861         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
862         reset()
863
864         #  gt
865         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
866                                              ],
867                                             [(0, "y"*100)],
868                                             None,
869                                             )}, [(10,5)])
870         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
871         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
872         reset()
873
874         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
875                                              ],
876                                             [(0, "x"*100)],
877                                             None,
878                                             )}, [(10,5)])
879         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
880         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
881         reset()
882
883         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
884                                              ],
885                                             [(0, "x"*100)],
886                                             None,
887                                             )}, [(10,5)])
888         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
889         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
890         reset()
891
892     def test_readv(self):
893         ss = self.create("test_readv")
894         secrets = ( self.write_enabler("we1"),
895                     self.renew_secret("we1"),
896                     self.cancel_secret("we1") )
897         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
898         write = ss.remote_slot_testv_and_readv_and_writev
899         read = ss.remote_slot_readv
900         data = [("%d" % i) * 100 for i in range(3)]
901         rc = write("si1", secrets,
902                    {0: ([], [(0,data[0])], None),
903                     1: ([], [(0,data[1])], None),
904                     2: ([], [(0,data[2])], None),
905                     }, [])
906         self.failUnlessEqual(rc, (True, {}))
907
908         answer = read("si1", [], [(0, 10)])
909         self.failUnlessEqual(answer, {0: ["0"*10],
910                                       1: ["1"*10],
911                                       2: ["2"*10]})
912
913     def compare_leases_without_timestamps(self, a, b):
914         self.failUnlessEqual(len(a), len(b))
915         for i in range(len(a)):
916             (num_a, (ownerid_a, expiration_time_a,
917                    renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
918             (num_b, (ownerid_b, expiration_time_b,
919                    renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
920             self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
921                                    cancel_secret_a, nodeid_a),
922                                   (num_b, ownerid_b, renew_secret_b,
923                                    cancel_secret_b, nodeid_b) )
924
925     def test_leases(self):
926         ss = self.create("test_leases", sizelimit=1000*1000)
927         def secrets(n):
928             return ( self.write_enabler("we1"),
929                      self.renew_secret("we1-%d" % n),
930                      self.cancel_secret("we1-%d" % n) )
931         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
932         write = ss.remote_slot_testv_and_readv_and_writev
933         read = ss.remote_slot_readv
934         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
935         self.failUnlessEqual(rc, (True, {}))
936
937         # create a random non-numeric file in the bucket directory, to
938         # exercise the code that's supposed to ignore those.
939         bucket_dir = os.path.join(self.workdir("test_leases"),
940                                   "shares", storage_index_to_dir("si1"))
941         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
942         f.write("you ought to be ignoring me\n")
943         f.close()
944
945         # re-allocate the slots and use the same secrets, that should update
946         # the lease
947         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
948
949         # renew it directly
950         ss.remote_renew_lease("si1", secrets(0)[1])
951
952         # now allocate them with a bunch of different secrets, to trigger the
953         # extended lease code
954         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
955         write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
956         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
957         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
958         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
959
960         # cancel one of them
961         ss.remote_cancel_lease("si1", secrets(5)[2])
962
963         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
964         all_leases = s0.debug_get_leases()
965         self.failUnlessEqual(len(all_leases), 5)
966
967         # and write enough data to expand the container, forcing the server
968         # to move the leases
969         write("si1", secrets(0),
970               {0: ([], [(0,data)], 200), },
971               [])
972
973         # read back the leases, make sure they're still intact.
974         self.compare_leases_without_timestamps(all_leases,
975                                                s0.debug_get_leases())
976
977         ss.remote_renew_lease("si1", secrets(0)[1])
978         ss.remote_renew_lease("si1", secrets(1)[1])
979         ss.remote_renew_lease("si1", secrets(2)[1])
980         ss.remote_renew_lease("si1", secrets(3)[1])
981         ss.remote_renew_lease("si1", secrets(4)[1])
982         self.compare_leases_without_timestamps(all_leases,
983                                                s0.debug_get_leases())
984         # get a new copy of the leases, with the current timestamps. Reading
985         # data and failing to renew/cancel leases should leave the timestamps
986         # alone.
987         all_leases = s0.debug_get_leases()
988         # renewing with a bogus token should prompt an error message
989
990         # TODO: examine the exception thus raised, make sure the old nodeid
991         # is present, to provide for share migration
992         self.failUnlessRaises(IndexError,
993                               ss.remote_renew_lease, "si1",
994                               secrets(20)[1])
995         # same for cancelling
996         self.failUnlessRaises(IndexError,
997                               ss.remote_cancel_lease, "si1",
998                               secrets(20)[2])
999         self.failUnlessEqual(all_leases, s0.debug_get_leases())
1000
1001         # reading shares should not modify the timestamp
1002         read("si1", [], [(0,200)])
1003         self.failUnlessEqual(all_leases, s0.debug_get_leases())
1004
1005         write("si1", secrets(0),
1006               {0: ([], [(200, "make me bigger")], None)}, [])
1007         self.compare_leases_without_timestamps(all_leases,
1008                                                s0.debug_get_leases())
1009
1010         write("si1", secrets(0),
1011               {0: ([], [(500, "make me really bigger")], None)}, [])
1012         self.compare_leases_without_timestamps(all_leases,
1013                                                s0.debug_get_leases())
1014
1015         # now cancel them all
1016         ss.remote_cancel_lease("si1", secrets(0)[2])
1017         ss.remote_cancel_lease("si1", secrets(1)[2])
1018         ss.remote_cancel_lease("si1", secrets(2)[2])
1019         ss.remote_cancel_lease("si1", secrets(3)[2])
1020
1021         # the slot should still be there
1022         remaining_shares = read("si1", [], [(0,10)])
1023         self.failUnlessEqual(len(remaining_shares), 1)
1024         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1025
1026         # cancelling a non-existent lease should raise an IndexError
1027         self.failUnlessRaises(IndexError,
1028                               ss.remote_cancel_lease, "si1", "nonsecret")
1029
1030         # and the slot should still be there
1031         remaining_shares = read("si1", [], [(0,10)])
1032         self.failUnlessEqual(len(remaining_shares), 1)
1033         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1034
1035         ss.remote_cancel_lease("si1", secrets(4)[2])
1036         # now the slot should be gone
1037         no_shares = read("si1", [], [(0,10)])
1038         self.failUnlessEqual(no_shares, {})
1039
1040         # cancelling a lease on a non-existent share should raise an IndexError
1041         self.failUnlessRaises(IndexError,
1042                               ss.remote_cancel_lease, "si2", "nonsecret")
1043
1044
1045 class Stats(unittest.TestCase):
1046
1047     def setUp(self):
1048         self.sparent = LoggingServiceParent()
1049         self._lease_secret = itertools.count()
1050     def tearDown(self):
1051         return self.sparent.stopService()
1052
1053     def workdir(self, name):
1054         basedir = os.path.join("storage", "Server", name)
1055         return basedir
1056
1057     def create(self, name, sizelimit=None):
1058         workdir = self.workdir(name)
1059         ss = StorageServer(workdir, sizelimit)
1060         ss.setServiceParent(self.sparent)
1061         return ss
1062
1063     def test_latencies(self):
1064         ss = self.create("test_latencies")
1065         for i in range(10000):
1066             ss.add_latency("allocate", 1.0 * i)
1067         for i in range(1000):
1068             ss.add_latency("renew", 1.0 * i)
1069         for i in range(10):
1070             ss.add_latency("cancel", 2.0 * i)
1071         ss.add_latency("get", 5.0)
1072
1073         output = ss.get_latencies()
1074
1075         self.failUnlessEqual(sorted(output.keys()),
1076                              sorted(["allocate", "renew", "cancel", "get"]))
1077         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1078         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1079         self.failUnless(abs(output["allocate"]["median"] - 9500) < 1)
1080         self.failUnless(abs(output["allocate"]["90_percentile"] - 9900) < 1)
1081         self.failUnless(abs(output["allocate"]["95_percentile"] - 9950) < 1)
1082         self.failUnless(abs(output["allocate"]["99_percentile"] - 9990) < 1)
1083         self.failUnless(abs(output["allocate"]["999_percentile"] - 9999) < 1)
1084
1085         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1086         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1087         self.failUnless(abs(output["renew"]["median"] - 500) < 1)
1088         self.failUnless(abs(output["renew"]["90_percentile"] - 900) < 1)
1089         self.failUnless(abs(output["renew"]["95_percentile"] - 950) < 1)
1090         self.failUnless(abs(output["renew"]["99_percentile"] - 990) < 1)
1091         self.failUnless(abs(output["renew"]["999_percentile"] - 999) < 1)
1092
1093         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1094         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1095         self.failUnless(abs(output["cancel"]["median"] - 10) < 1)
1096         self.failUnless(abs(output["cancel"]["90_percentile"] - 18) < 1)
1097         self.failUnless(abs(output["cancel"]["95_percentile"] - 18) < 1)
1098         self.failUnless(abs(output["cancel"]["99_percentile"] - 18) < 1)
1099         self.failUnless(abs(output["cancel"]["999_percentile"] - 18) < 1)
1100
1101         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1102         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1103         self.failUnless(abs(output["get"]["median"] - 5) < 1)
1104         self.failUnless(abs(output["get"]["90_percentile"] - 5) < 1)
1105         self.failUnless(abs(output["get"]["95_percentile"] - 5) < 1)
1106         self.failUnless(abs(output["get"]["99_percentile"] - 5) < 1)
1107         self.failUnless(abs(output["get"]["999_percentile"] - 5) < 1)