]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
test_storage: yet more coverage
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 from twisted.trial import unittest
3
4 from twisted.internet import defer
5 import time, os.path, stat
6 import itertools
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil
9 from allmydata.storage import BucketWriter, BucketReader, \
10      WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
11      storage_index_to_dir, DataTooLargeError
12 from allmydata.interfaces import BadWriteEnablerError
13 from allmydata.test.common import LoggingServiceParent
14
15 class Marker:
16     pass
17 class FakeCanary:
18     def __init__(self, ignore_disconnectors=False):
19         self.ignore = ignore_disconnectors
20         self.disconnectors = {}
21     def notifyOnDisconnect(self, f, *args, **kwargs):
22         if self.ignore:
23             return
24         m = Marker()
25         self.disconnectors[m] = (f, args, kwargs)
26         return m
27     def dontNotifyOnDisconnect(self, marker):
28         if self.ignore:
29             return
30         del self.disconnectors[marker]
31
32 class FakeStatsProvider:
33     def count(self, name, delta=1):
34         pass
35     def register_producer(self, producer):
36         pass
37
38 class Bucket(unittest.TestCase):
39     def make_workdir(self, name):
40         basedir = os.path.join("storage", "Bucket", name)
41         incoming = os.path.join(basedir, "tmp", "bucket")
42         final = os.path.join(basedir, "bucket")
43         fileutil.make_dirs(basedir)
44         fileutil.make_dirs(os.path.join(basedir, "tmp"))
45         return incoming, final
46
47     def bucket_writer_closed(self, bw, consumed):
48         pass
49     def add_latency(self, category, latency):
50         pass
51     def count(self, name, delta=1):
52         pass
53
54     def make_lease(self):
55         owner_num = 0
56         renew_secret = os.urandom(32)
57         cancel_secret = os.urandom(32)
58         expiration_time = time.time() + 5000
59         return (owner_num, renew_secret, cancel_secret, expiration_time)
60
61     def test_create(self):
62         incoming, final = self.make_workdir("test_create")
63         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
64                           FakeCanary())
65         bw.remote_write(0, "a"*25)
66         bw.remote_write(25, "b"*25)
67         bw.remote_write(50, "c"*25)
68         bw.remote_write(75, "d"*7)
69         bw.remote_close()
70
71     def test_readwrite(self):
72         incoming, final = self.make_workdir("test_readwrite")
73         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
74                           FakeCanary())
75         bw.remote_write(0, "a"*25)
76         bw.remote_write(25, "b"*25)
77         bw.remote_write(50, "c"*7) # last block may be short
78         bw.remote_close()
79
80         # now read from it
81         br = BucketReader(self, bw.finalhome)
82         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
83         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
84         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
85
86 class RemoteBucket:
87
88     def callRemote(self, methname, *args, **kwargs):
89         def _call():
90             meth = getattr(self.target, "remote_" + methname)
91             return meth(*args, **kwargs)
92         return defer.maybeDeferred(_call)
93
94 class BucketProxy(unittest.TestCase):
95     def make_bucket(self, name, size):
96         basedir = os.path.join("storage", "BucketProxy", name)
97         incoming = os.path.join(basedir, "tmp", "bucket")
98         final = os.path.join(basedir, "bucket")
99         fileutil.make_dirs(basedir)
100         fileutil.make_dirs(os.path.join(basedir, "tmp"))
101         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
102                           FakeCanary())
103         rb = RemoteBucket()
104         rb.target = bw
105         return bw, rb, final
106
107     def make_lease(self):
108         owner_num = 0
109         renew_secret = os.urandom(32)
110         cancel_secret = os.urandom(32)
111         expiration_time = time.time() + 5000
112         return (owner_num, renew_secret, cancel_secret, expiration_time)
113
114     def bucket_writer_closed(self, bw, consumed):
115         pass
116     def add_latency(self, category, latency):
117         pass
118     def count(self, name, delta=1):
119         pass
120
121     def test_create(self):
122         bw, rb, sharefname = self.make_bucket("test_create", 500)
123         bp = WriteBucketProxy(rb,
124                               data_size=300,
125                               segment_size=10,
126                               num_segments=5,
127                               num_share_hashes=3,
128                               uri_extension_size=500, nodeid=None)
129         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
130
131     def test_readwrite(self):
132         # Let's pretend each share has 100 bytes of data, and that there are
133         # 4 segments (25 bytes each), and 8 shares total. So the three
134         # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
135         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
136         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
137         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
138         # long. That should make the whole share:
139         #
140         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
141
142         plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
143                             for i in range(7)]
144         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
145                             for i in range(7)]
146         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
147                         for i in range(7)]
148         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
149                         for i in (1,9,13)]
150         uri_extension = "s" + "E"*498 + "e"
151
152         bw, rb, sharefname = self.make_bucket("test_readwrite", 1414)
153         bp = WriteBucketProxy(rb,
154                               data_size=95,
155                               segment_size=25,
156                               num_segments=4,
157                               num_share_hashes=3,
158                               uri_extension_size=len(uri_extension),
159                               nodeid=None)
160
161         d = bp.start()
162         d.addCallback(lambda res: bp.put_block(0, "a"*25))
163         d.addCallback(lambda res: bp.put_block(1, "b"*25))
164         d.addCallback(lambda res: bp.put_block(2, "c"*25))
165         d.addCallback(lambda res: bp.put_block(3, "d"*20))
166         d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
167         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
168         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
169         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
170         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
171         d.addCallback(lambda res: bp.close())
172
173         # now read everything back
174         def _start_reading(res):
175             br = BucketReader(self, sharefname)
176             rb = RemoteBucket()
177             rb.target = br
178             rbp = ReadBucketProxy(rb, peerid="abc")
179             self.failUnless("to peer" in repr(rbp))
180             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
181
182             d1 = rbp.startIfNecessary()
183             d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
184             d1.addCallback(lambda res: rbp.get_block(0))
185             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
186             d1.addCallback(lambda res: rbp.get_block(1))
187             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
188             d1.addCallback(lambda res: rbp.get_block(2))
189             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
190             d1.addCallback(lambda res: rbp.get_block(3))
191             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
192
193             d1.addCallback(lambda res: rbp.get_plaintext_hashes())
194             d1.addCallback(lambda res:
195                            self.failUnlessEqual(res, plaintext_hashes))
196             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
197             d1.addCallback(lambda res:
198                            self.failUnlessEqual(res, crypttext_hashes))
199             d1.addCallback(lambda res: rbp.get_block_hashes())
200             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
201             d1.addCallback(lambda res: rbp.get_share_hashes())
202             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
203             d1.addCallback(lambda res: rbp.get_uri_extension())
204             d1.addCallback(lambda res:
205                            self.failUnlessEqual(res, uri_extension))
206
207             return d1
208
209         d.addCallback(_start_reading)
210
211         return d
212
213
214
215 class Server(unittest.TestCase):
216
217     def setUp(self):
218         self.sparent = LoggingServiceParent()
219         self._lease_secret = itertools.count()
220     def tearDown(self):
221         return self.sparent.stopService()
222
223     def workdir(self, name):
224         basedir = os.path.join("storage", "Server", name)
225         return basedir
226
227     def create(self, name, sizelimit=None):
228         workdir = self.workdir(name)
229         ss = StorageServer(workdir, sizelimit,
230                            stats_provider=FakeStatsProvider())
231         ss.setServiceParent(self.sparent)
232         return ss
233
234     def test_create(self):
235         ss = self.create("test_create")
236
237     def allocate(self, ss, storage_index, sharenums, size, canary=None):
238         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
239         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
240         if not canary:
241             canary = FakeCanary()
242         return ss.remote_allocate_buckets(storage_index,
243                                           renew_secret, cancel_secret,
244                                           sharenums, size, canary)
245
246     def test_dont_overfill_dirs(self):
247         """
248         This test asserts that if you add a second share whose storage index
249         share lots of leading bits with an extant share (but isn't the exact
250         same storage index), this won't add an entry to the share directory.
251         """
252         ss = self.create("test_dont_overfill_dirs")
253         already, writers = self.allocate(ss, "storageindex", [0], 10)
254         for i, wb in writers.items():
255             wb.remote_write(0, "%10d" % i)
256             wb.remote_close()
257         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
258                                 "shares")
259         children_of_storedir = set(os.listdir(storedir))
260
261         # Now store another one under another storageindex that has leading
262         # chars the same as the first storageindex.
263         already, writers = self.allocate(ss, "storageindey", [0], 10)
264         for i, wb in writers.items():
265             wb.remote_write(0, "%10d" % i)
266             wb.remote_close()
267         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
268                                 "shares")
269         new_children_of_storedir = set(os.listdir(storedir))
270         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
271
272     def test_remove_incoming(self):
273         ss = self.create("test_remove_incoming")
274         already, writers = self.allocate(ss, "vid", range(3), 10)
275         for i,wb in writers.items():
276             wb.remote_write(0, "%10d" % i)
277             wb.remote_close()
278         incomingdir = os.path.dirname(os.path.dirname(os.path.dirname(wb.incominghome)))
279         self.failIf(os.path.exists(incomingdir))
280
281     def test_allocate(self):
282         ss = self.create("test_allocate")
283
284         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
285
286         canary = FakeCanary()
287         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
288         self.failUnlessEqual(already, set())
289         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
290
291         # while the buckets are open, they should not count as readable
292         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
293
294         # close the buckets
295         for i,wb in writers.items():
296             wb.remote_write(0, "%25d" % i)
297             wb.remote_close()
298             # aborting a bucket that was already closed is a no-op
299             wb.remote_abort()
300
301         # now they should be readable
302         b = ss.remote_get_buckets("allocate")
303         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
304         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
305
306         # now if we ask about writing again, the server should offer those
307         # three buckets as already present. It should offer them even if we
308         # don't ask about those specific ones.
309         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
310         self.failUnlessEqual(already, set([0,1,2]))
311         self.failUnlessEqual(set(writers.keys()), set([3,4]))
312
313         # while those two buckets are open for writing, the server should
314         # refuse to offer them to uploaders
315
316         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
317         self.failUnlessEqual(already2, set([0,1,2]))
318         self.failUnlessEqual(set(writers2.keys()), set([5]))
319
320         # aborting the writes should remove the tempfiles
321         for i,wb in writers2.items():
322             wb.remote_abort()
323         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
324         self.failUnlessEqual(already2, set([0,1,2]))
325         self.failUnlessEqual(set(writers2.keys()), set([5]))
326
327         for i,wb in writers2.items():
328             wb.remote_abort()
329         for i,wb in writers.items():
330             wb.remote_abort()
331
332     def test_disconnect(self):
333         # simulate a disconnection
334         ss = self.create("test_disconnect")
335         canary = FakeCanary()
336         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
337         self.failUnlessEqual(already, set())
338         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
339         for (f,args,kwargs) in canary.disconnectors.values():
340             f(*args, **kwargs)
341         del already
342         del writers
343
344         # that ought to delete the incoming shares
345         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
346         self.failUnlessEqual(already, set())
347         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
348
349     def test_sizelimits(self):
350         ss = self.create("test_sizelimits", 5000)
351         # a newly created and filled share incurs this much overhead, beyond
352         # the size we request.
353         OVERHEAD = 3*4
354         LEASE_SIZE = 4+32+32+4
355         canary = FakeCanary(True)
356         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
357         self.failUnlessEqual(len(writers), 3)
358         # now the StorageServer should have 3000 bytes provisionally
359         # allocated, allowing only 2000 more to be claimed
360         self.failUnlessEqual(len(ss._active_writers), 3)
361
362         # allocating 1001-byte shares only leaves room for one
363         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
364         self.failUnlessEqual(len(writers2), 1)
365         self.failUnlessEqual(len(ss._active_writers), 4)
366
367         # we abandon the first set, so their provisional allocation should be
368         # returned
369         del already
370         del writers
371         self.failUnlessEqual(len(ss._active_writers), 1)
372         # now we have a provisional allocation of 1001 bytes
373
374         # and we close the second set, so their provisional allocation should
375         # become real, long-term allocation, and grows to include the
376         # overhead.
377         for bw in writers2.values():
378             bw.remote_write(0, "a"*25)
379             bw.remote_close()
380         del already2
381         del writers2
382         del bw
383         self.failUnlessEqual(len(ss._active_writers), 0)
384
385         allocated = 1001 + OVERHEAD + LEASE_SIZE
386         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
387         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
388         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
389         self.failUnlessEqual(len(writers3), 39)
390         self.failUnlessEqual(len(ss._active_writers), 39)
391
392         del already3
393         del writers3
394         self.failUnlessEqual(len(ss._active_writers), 0)
395         ss.disownServiceParent()
396         del ss
397
398         # creating a new StorageServer in the same directory should see the
399         # same usage.
400
401         # metadata that goes into the share file is counted upon share close,
402         # as well as at startup. metadata that goes into other files will not
403         # be counted until the next startup, so if we were creating any
404         # extra-file metadata, the allocation would be more than 'allocated'
405         # and this test would need to be changed.
406         ss = self.create("test_sizelimits", 5000)
407         already4,writers4 = self.allocate(ss, "vid4", range(100), 100, canary)
408         self.failUnlessEqual(len(writers4), 39)
409         self.failUnlessEqual(len(ss._active_writers), 39)
410
411     def test_seek(self):
412         basedir = self.workdir("test_seek_behavior")
413         fileutil.make_dirs(basedir)
414         filename = os.path.join(basedir, "testfile")
415         f = open(filename, "wb")
416         f.write("start")
417         f.close()
418         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
419         # files. mode="a" preserves previous contents but does not allow
420         # seeking-to-create-holes. mode="r+" allows both.
421         f = open(filename, "rb+")
422         f.seek(100)
423         f.write("100")
424         f.close()
425         filelen = os.stat(filename)[stat.ST_SIZE]
426         self.failUnlessEqual(filelen, 100+3)
427         f2 = open(filename, "rb")
428         self.failUnlessEqual(f2.read(5), "start")
429
430
431     def test_leases(self):
432         ss = self.create("test_leases")
433         canary = FakeCanary()
434         sharenums = range(5)
435         size = 100
436
437         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
438                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
439         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
440                                                      sharenums, size, canary)
441         self.failUnlessEqual(len(already), 0)
442         self.failUnlessEqual(len(writers), 5)
443         for wb in writers.values():
444             wb.remote_close()
445
446         leases = list(ss.get_leases("si0"))
447         self.failUnlessEqual(len(leases), 1)
448         self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
449
450         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
451                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
452         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
453                                                      sharenums, size, canary)
454         for wb in writers.values():
455             wb.remote_close()
456
457         # take out a second lease on si1
458         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
459                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
460         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
461                                                      sharenums, size, canary)
462         self.failUnlessEqual(len(already), 5)
463         self.failUnlessEqual(len(writers), 0)
464
465         leases = list(ss.get_leases("si1"))
466         self.failUnlessEqual(len(leases), 2)
467         self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
468
469         # check that si0 is readable
470         readers = ss.remote_get_buckets("si0")
471         self.failUnlessEqual(len(readers), 5)
472
473         # renew the first lease. Only the proper renew_secret should work
474         ss.remote_renew_lease("si0", rs0)
475         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
476         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
477
478         # check that si0 is still readable
479         readers = ss.remote_get_buckets("si0")
480         self.failUnlessEqual(len(readers), 5)
481
482         # now cancel it
483         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
484         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
485         ss.remote_cancel_lease("si0", cs0)
486
487         # si0 should now be gone
488         readers = ss.remote_get_buckets("si0")
489         self.failUnlessEqual(len(readers), 0)
490         # and the renew should no longer work
491         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
492
493
494         # cancel the first lease on si1, leaving the second in place
495         ss.remote_cancel_lease("si1", cs1)
496         readers = ss.remote_get_buckets("si1")
497         self.failUnlessEqual(len(readers), 5)
498         # the corresponding renew should no longer work
499         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
500
501         leases = list(ss.get_leases("si1"))
502         self.failUnlessEqual(len(leases), 1)
503         self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
504
505         ss.remote_renew_lease("si1", rs2)
506         # cancelling the second should make it go away
507         ss.remote_cancel_lease("si1", cs2)
508         readers = ss.remote_get_buckets("si1")
509         self.failUnlessEqual(len(readers), 0)
510         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
511         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
512
513         leases = list(ss.get_leases("si1"))
514         self.failUnlessEqual(len(leases), 0)
515
516
517         # test overlapping uploads
518         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
519                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
520         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
521                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
522         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
523                                                      sharenums, size, canary)
524         self.failUnlessEqual(len(already), 0)
525         self.failUnlessEqual(len(writers), 5)
526         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
527                                                        sharenums, size, canary)
528         self.failUnlessEqual(len(already2), 0)
529         self.failUnlessEqual(len(writers2), 0)
530         for wb in writers.values():
531             wb.remote_close()
532
533         leases = list(ss.get_leases("si3"))
534         self.failUnlessEqual(len(leases), 1)
535
536         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
537                                                        sharenums, size, canary)
538         self.failUnlessEqual(len(already3), 5)
539         self.failUnlessEqual(len(writers3), 0)
540
541         leases = list(ss.get_leases("si3"))
542         self.failUnlessEqual(len(leases), 2)
543
544     def test_readonly(self):
545         workdir = self.workdir("test_readonly")
546         ss = StorageServer(workdir, readonly_storage=True)
547         ss.setServiceParent(self.sparent)
548
549         canary = FakeCanary()
550         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
551         self.failUnlessEqual(already, set())
552         self.failUnlessEqual(writers, {})
553
554     def test_discard(self):
555         # discard is really only used for other tests, but we test it anyways
556         workdir = self.workdir("test_discard")
557         ss = StorageServer(workdir, discard_storage=True)
558         ss.setServiceParent(self.sparent)
559
560         canary = FakeCanary()
561         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
562         self.failUnlessEqual(already, set())
563         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
564         for i,wb in writers.items():
565             wb.remote_write(0, "%25d" % i)
566             wb.remote_close()
567         # since we discard the data, the shares should be present but sparse.
568         # Since we write with some seeks, the data we read back will be all
569         # zeros.
570         b = ss.remote_get_buckets("vid")
571         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
572         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
573
574
575
576 class MutableServer(unittest.TestCase):
577
578     def setUp(self):
579         self.sparent = LoggingServiceParent()
580         self._lease_secret = itertools.count()
581     def tearDown(self):
582         return self.sparent.stopService()
583
584     def workdir(self, name):
585         basedir = os.path.join("storage", "MutableServer", name)
586         return basedir
587
588     def create(self, name, sizelimit=None):
589         workdir = self.workdir(name)
590         ss = StorageServer(workdir, sizelimit)
591         ss.setServiceParent(self.sparent)
592         ss.setNodeID("\x00" * 32)
593         return ss
594
595     def test_create(self):
596         ss = self.create("test_create")
597
598     def write_enabler(self, we_tag):
599         return hashutil.tagged_hash("we_blah", we_tag)
600
601     def renew_secret(self, tag):
602         return hashutil.tagged_hash("renew_blah", str(tag))
603
604     def cancel_secret(self, tag):
605         return hashutil.tagged_hash("cancel_blah", str(tag))
606
607     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
608         write_enabler = self.write_enabler(we_tag)
609         renew_secret = self.renew_secret(lease_tag)
610         cancel_secret = self.cancel_secret(lease_tag)
611         rstaraw = ss.remote_slot_testv_and_readv_and_writev
612         testandwritev = dict( [ (shnum, ([], [], None) )
613                          for shnum in sharenums ] )
614         readv = []
615         rc = rstaraw(storage_index,
616                      (write_enabler, renew_secret, cancel_secret),
617                      testandwritev,
618                      readv)
619         (did_write, readv_data) = rc
620         self.failUnless(did_write)
621         self.failUnless(isinstance(readv_data, dict))
622         self.failUnlessEqual(len(readv_data), 0)
623
624     def test_container_size(self):
625         ss = self.create("test_container_size")
626         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
627                       set([0,1,2]), 100)
628         rstaraw = ss.remote_slot_testv_and_readv_and_writev
629         secrets = ( self.write_enabler("we1"),
630                     self.renew_secret("we1"),
631                     self.cancel_secret("we1") )
632         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
633         answer = rstaraw("si1", secrets,
634                          {0: ([], [(0,data)], len(data)+12)},
635                          [])
636         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
637
638         # trying to make the container too large will raise an exception
639         TOOBIG = MutableShareFile.MAX_SIZE + 10
640         self.failUnlessRaises(DataTooLargeError,
641                               rstaraw, "si1", secrets,
642                               {0: ([], [(0,data)], TOOBIG)},
643                               [])
644
645         # it should be possible to make the container smaller, although at
646         # the moment this doesn't actually affect the share
647         answer = rstaraw("si1", secrets,
648                          {0: ([], [(0,data)], len(data)+8)},
649                          [])
650         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
651
652     def test_allocate(self):
653         ss = self.create("test_allocate")
654         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
655                       set([0,1,2]), 100)
656
657         read = ss.remote_slot_readv
658         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
659                              {0: [""]})
660         self.failUnlessEqual(read("si1", [], [(0, 10)]),
661                              {0: [""], 1: [""], 2: [""]})
662         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
663                              {0: [""]})
664
665         # try writing to one
666         secrets = ( self.write_enabler("we1"),
667                     self.renew_secret("we1"),
668                     self.cancel_secret("we1") )
669         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
670         write = ss.remote_slot_testv_and_readv_and_writev
671         answer = write("si1", secrets,
672                        {0: ([], [(0,data)], None)},
673                        [])
674         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
675
676         self.failUnlessEqual(read("si1", [0], [(0,20)]),
677                              {0: ["00000000001111111111"]})
678         self.failUnlessEqual(read("si1", [0], [(95,10)]),
679                              {0: ["99999"]})
680         #self.failUnlessEqual(s0.remote_get_length(), 100)
681
682         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
683         f = self.failUnlessRaises(BadWriteEnablerError,
684                                   write, "si1", bad_secrets,
685                                   {}, [])
686         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
687
688         # this testv should fail
689         answer = write("si1", secrets,
690                        {0: ([(0, 12, "eq", "444444444444"),
691                              (20, 5, "eq", "22222"),
692                              ],
693                             [(0, "x"*100)],
694                             None),
695                         },
696                        [(0,12), (20,5)],
697                        )
698         self.failUnlessEqual(answer, (False,
699                                       {0: ["000000000011", "22222"],
700                                        1: ["", ""],
701                                        2: ["", ""],
702                                        }))
703         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
704
705         # as should this one
706         answer = write("si1", secrets,
707                        {0: ([(10, 5, "lt", "11111"),
708                              ],
709                             [(0, "x"*100)],
710                             None),
711                         },
712                        [(10,5)],
713                        )
714         self.failUnlessEqual(answer, (False,
715                                       {0: ["11111"],
716                                        1: [""],
717                                        2: [""]},
718                                       ))
719         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
720
721
722     def test_operators(self):
723         # test operators, the data we're comparing is '11111' in all cases.
724         # test both fail+pass, reset data after each one.
725         ss = self.create("test_operators")
726
727         secrets = ( self.write_enabler("we1"),
728                     self.renew_secret("we1"),
729                     self.cancel_secret("we1") )
730         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
731         write = ss.remote_slot_testv_and_readv_and_writev
732         read = ss.remote_slot_readv
733
734         def reset():
735             write("si1", secrets,
736                   {0: ([], [(0,data)], None)},
737                   [])
738
739         reset()
740
741         #  lt
742         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
743                                              ],
744                                             [(0, "x"*100)],
745                                             None,
746                                             )}, [(10,5)])
747         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
748         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
749         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
750         reset()
751
752         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
753                                              ],
754                                             [(0, "x"*100)],
755                                             None,
756                                             )}, [(10,5)])
757         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
758         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
759         reset()
760
761         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
762                                              ],
763                                             [(0, "y"*100)],
764                                             None,
765                                             )}, [(10,5)])
766         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
767         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
768         reset()
769
770         #  le
771         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
772                                              ],
773                                             [(0, "x"*100)],
774                                             None,
775                                             )}, [(10,5)])
776         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
777         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
778         reset()
779
780         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
781                                              ],
782                                             [(0, "y"*100)],
783                                             None,
784                                             )}, [(10,5)])
785         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
786         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
787         reset()
788
789         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
790                                              ],
791                                             [(0, "y"*100)],
792                                             None,
793                                             )}, [(10,5)])
794         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
795         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
796         reset()
797
798         #  eq
799         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
800                                              ],
801                                             [(0, "x"*100)],
802                                             None,
803                                             )}, [(10,5)])
804         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
805         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
806         reset()
807
808         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
809                                              ],
810                                             [(0, "y"*100)],
811                                             None,
812                                             )}, [(10,5)])
813         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
814         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
815         reset()
816
817         #  ne
818         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
819                                              ],
820                                             [(0, "x"*100)],
821                                             None,
822                                             )}, [(10,5)])
823         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
824         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
825         reset()
826
827         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
828                                              ],
829                                             [(0, "y"*100)],
830                                             None,
831                                             )}, [(10,5)])
832         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
833         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
834         reset()
835
836         #  ge
837         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
838                                              ],
839                                             [(0, "y"*100)],
840                                             None,
841                                             )}, [(10,5)])
842         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
843         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
844         reset()
845
846         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
847                                              ],
848                                             [(0, "y"*100)],
849                                             None,
850                                             )}, [(10,5)])
851         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
852         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
853         reset()
854
855         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
856                                              ],
857                                             [(0, "y"*100)],
858                                             None,
859                                             )}, [(10,5)])
860         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
861         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
862         reset()
863
864         #  gt
865         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
866                                              ],
867                                             [(0, "y"*100)],
868                                             None,
869                                             )}, [(10,5)])
870         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
871         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
872         reset()
873
874         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
875                                              ],
876                                             [(0, "x"*100)],
877                                             None,
878                                             )}, [(10,5)])
879         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
880         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
881         reset()
882
883         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
884                                              ],
885                                             [(0, "x"*100)],
886                                             None,
887                                             )}, [(10,5)])
888         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
889         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
890         reset()
891
892         # finally, test some operators against empty shares
893         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
894                                              ],
895                                             [(0, "x"*100)],
896                                             None,
897                                             )}, [(10,5)])
898         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
899         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
900         reset()
901
902     def test_readv(self):
903         ss = self.create("test_readv")
904         secrets = ( self.write_enabler("we1"),
905                     self.renew_secret("we1"),
906                     self.cancel_secret("we1") )
907         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
908         write = ss.remote_slot_testv_and_readv_and_writev
909         read = ss.remote_slot_readv
910         data = [("%d" % i) * 100 for i in range(3)]
911         rc = write("si1", secrets,
912                    {0: ([], [(0,data[0])], None),
913                     1: ([], [(0,data[1])], None),
914                     2: ([], [(0,data[2])], None),
915                     }, [])
916         self.failUnlessEqual(rc, (True, {}))
917
918         answer = read("si1", [], [(0, 10)])
919         self.failUnlessEqual(answer, {0: ["0"*10],
920                                       1: ["1"*10],
921                                       2: ["2"*10]})
922
923     def compare_leases_without_timestamps(self, a, b):
924         self.failUnlessEqual(len(a), len(b))
925         for i in range(len(a)):
926             (num_a, (ownerid_a, expiration_time_a,
927                    renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
928             (num_b, (ownerid_b, expiration_time_b,
929                    renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
930             self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
931                                    cancel_secret_a, nodeid_a),
932                                   (num_b, ownerid_b, renew_secret_b,
933                                    cancel_secret_b, nodeid_b) )
934
935     def test_leases(self):
936         ss = self.create("test_leases", sizelimit=1000*1000)
937         def secrets(n):
938             return ( self.write_enabler("we1"),
939                      self.renew_secret("we1-%d" % n),
940                      self.cancel_secret("we1-%d" % n) )
941         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
942         write = ss.remote_slot_testv_and_readv_and_writev
943         read = ss.remote_slot_readv
944         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
945         self.failUnlessEqual(rc, (True, {}))
946
947         # create a random non-numeric file in the bucket directory, to
948         # exercise the code that's supposed to ignore those.
949         bucket_dir = os.path.join(self.workdir("test_leases"),
950                                   "shares", storage_index_to_dir("si1"))
951         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
952         f.write("you ought to be ignoring me\n")
953         f.close()
954
955         # re-allocate the slots and use the same secrets, that should update
956         # the lease
957         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
958
959         # renew it directly
960         ss.remote_renew_lease("si1", secrets(0)[1])
961
962         # now allocate them with a bunch of different secrets, to trigger the
963         # extended lease code
964         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
965         write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
966         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
967         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
968         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
969
970         # cancel one of them
971         ss.remote_cancel_lease("si1", secrets(5)[2])
972
973         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
974         all_leases = s0.debug_get_leases()
975         self.failUnlessEqual(len(all_leases), 5)
976
977         # and write enough data to expand the container, forcing the server
978         # to move the leases
979         write("si1", secrets(0),
980               {0: ([], [(0,data)], 200), },
981               [])
982
983         # read back the leases, make sure they're still intact.
984         self.compare_leases_without_timestamps(all_leases,
985                                                s0.debug_get_leases())
986
987         ss.remote_renew_lease("si1", secrets(0)[1])
988         ss.remote_renew_lease("si1", secrets(1)[1])
989         ss.remote_renew_lease("si1", secrets(2)[1])
990         ss.remote_renew_lease("si1", secrets(3)[1])
991         ss.remote_renew_lease("si1", secrets(4)[1])
992         self.compare_leases_without_timestamps(all_leases,
993                                                s0.debug_get_leases())
994         # get a new copy of the leases, with the current timestamps. Reading
995         # data and failing to renew/cancel leases should leave the timestamps
996         # alone.
997         all_leases = s0.debug_get_leases()
998         # renewing with a bogus token should prompt an error message
999
1000         # TODO: examine the exception thus raised, make sure the old nodeid
1001         # is present, to provide for share migration
1002         self.failUnlessRaises(IndexError,
1003                               ss.remote_renew_lease, "si1",
1004                               secrets(20)[1])
1005         # same for cancelling
1006         self.failUnlessRaises(IndexError,
1007                               ss.remote_cancel_lease, "si1",
1008                               secrets(20)[2])
1009         self.failUnlessEqual(all_leases, s0.debug_get_leases())
1010
1011         # reading shares should not modify the timestamp
1012         read("si1", [], [(0,200)])
1013         self.failUnlessEqual(all_leases, s0.debug_get_leases())
1014
1015         write("si1", secrets(0),
1016               {0: ([], [(200, "make me bigger")], None)}, [])
1017         self.compare_leases_without_timestamps(all_leases,
1018                                                s0.debug_get_leases())
1019
1020         write("si1", secrets(0),
1021               {0: ([], [(500, "make me really bigger")], None)}, [])
1022         self.compare_leases_without_timestamps(all_leases,
1023                                                s0.debug_get_leases())
1024
1025         # now cancel them all
1026         ss.remote_cancel_lease("si1", secrets(0)[2])
1027         ss.remote_cancel_lease("si1", secrets(1)[2])
1028         ss.remote_cancel_lease("si1", secrets(2)[2])
1029         ss.remote_cancel_lease("si1", secrets(3)[2])
1030
1031         # the slot should still be there
1032         remaining_shares = read("si1", [], [(0,10)])
1033         self.failUnlessEqual(len(remaining_shares), 1)
1034         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1035
1036         # cancelling a non-existent lease should raise an IndexError
1037         self.failUnlessRaises(IndexError,
1038                               ss.remote_cancel_lease, "si1", "nonsecret")
1039
1040         # and the slot should still be there
1041         remaining_shares = read("si1", [], [(0,10)])
1042         self.failUnlessEqual(len(remaining_shares), 1)
1043         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1044
1045         ss.remote_cancel_lease("si1", secrets(4)[2])
1046         # now the slot should be gone
1047         no_shares = read("si1", [], [(0,10)])
1048         self.failUnlessEqual(no_shares, {})
1049
1050         # cancelling a lease on a non-existent share should raise an IndexError
1051         self.failUnlessRaises(IndexError,
1052                               ss.remote_cancel_lease, "si2", "nonsecret")
1053
1054
1055 class Stats(unittest.TestCase):
1056
1057     def setUp(self):
1058         self.sparent = LoggingServiceParent()
1059         self._lease_secret = itertools.count()
1060     def tearDown(self):
1061         return self.sparent.stopService()
1062
1063     def workdir(self, name):
1064         basedir = os.path.join("storage", "Server", name)
1065         return basedir
1066
1067     def create(self, name, sizelimit=None):
1068         workdir = self.workdir(name)
1069         ss = StorageServer(workdir, sizelimit)
1070         ss.setServiceParent(self.sparent)
1071         return ss
1072
1073     def test_latencies(self):
1074         ss = self.create("test_latencies")
1075         for i in range(10000):
1076             ss.add_latency("allocate", 1.0 * i)
1077         for i in range(1000):
1078             ss.add_latency("renew", 1.0 * i)
1079         for i in range(10):
1080             ss.add_latency("cancel", 2.0 * i)
1081         ss.add_latency("get", 5.0)
1082
1083         output = ss.get_latencies()
1084
1085         self.failUnlessEqual(sorted(output.keys()),
1086                              sorted(["allocate", "renew", "cancel", "get"]))
1087         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1088         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1089         self.failUnless(abs(output["allocate"]["median"] - 9500) < 1)
1090         self.failUnless(abs(output["allocate"]["90_percentile"] - 9900) < 1)
1091         self.failUnless(abs(output["allocate"]["95_percentile"] - 9950) < 1)
1092         self.failUnless(abs(output["allocate"]["99_percentile"] - 9990) < 1)
1093         self.failUnless(abs(output["allocate"]["999_percentile"] - 9999) < 1)
1094
1095         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1096         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1097         self.failUnless(abs(output["renew"]["median"] - 500) < 1)
1098         self.failUnless(abs(output["renew"]["90_percentile"] - 900) < 1)
1099         self.failUnless(abs(output["renew"]["95_percentile"] - 950) < 1)
1100         self.failUnless(abs(output["renew"]["99_percentile"] - 990) < 1)
1101         self.failUnless(abs(output["renew"]["999_percentile"] - 999) < 1)
1102
1103         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1104         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1105         self.failUnless(abs(output["cancel"]["median"] - 10) < 1)
1106         self.failUnless(abs(output["cancel"]["90_percentile"] - 18) < 1)
1107         self.failUnless(abs(output["cancel"]["95_percentile"] - 18) < 1)
1108         self.failUnless(abs(output["cancel"]["99_percentile"] - 18) < 1)
1109         self.failUnless(abs(output["cancel"]["999_percentile"] - 18) < 1)
1110
1111         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1112         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1113         self.failUnless(abs(output["get"]["median"] - 5) < 1)
1114         self.failUnless(abs(output["get"]["90_percentile"] - 5) < 1)
1115         self.failUnless(abs(output["get"]["95_percentile"] - 5) < 1)
1116         self.failUnless(abs(output["get"]["99_percentile"] - 5) < 1)
1117         self.failUnless(abs(output["get"]["999_percentile"] - 5) < 1)