]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
storage: improve stats, make them accessible via webport /statistics
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 from twisted.trial import unittest
3
4 from twisted.internet import defer
5 import time, os.path, stat
6 import itertools
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil
9 from allmydata.storage import BucketWriter, BucketReader, \
10      WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
11      storage_index_to_dir
12 from allmydata.interfaces import BadWriteEnablerError
13 from allmydata.test.common import LoggingServiceParent
14
15 class FakeCanary:
16     def notifyOnDisconnect(self, *args, **kwargs):
17         pass
18     def dontNotifyOnDisconnect(self, marker):
19         pass
20
21 class Bucket(unittest.TestCase):
22     def make_workdir(self, name):
23         basedir = os.path.join("storage", "Bucket", name)
24         incoming = os.path.join(basedir, "tmp", "bucket")
25         final = os.path.join(basedir, "bucket")
26         fileutil.make_dirs(basedir)
27         fileutil.make_dirs(os.path.join(basedir, "tmp"))
28         return incoming, final
29
30     def bucket_writer_closed(self, bw, consumed):
31         pass
32     def add_latency(self, category, latency):
33         pass
34     def count(self, name, delta=1):
35         pass
36
37     def make_lease(self):
38         owner_num = 0
39         renew_secret = os.urandom(32)
40         cancel_secret = os.urandom(32)
41         expiration_time = time.time() + 5000
42         return (owner_num, renew_secret, cancel_secret, expiration_time)
43
44     def test_create(self):
45         incoming, final = self.make_workdir("test_create")
46         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
47                           FakeCanary())
48         bw.remote_write(0, "a"*25)
49         bw.remote_write(25, "b"*25)
50         bw.remote_write(50, "c"*25)
51         bw.remote_write(75, "d"*7)
52         bw.remote_close()
53
54     def test_readwrite(self):
55         incoming, final = self.make_workdir("test_readwrite")
56         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
57                           FakeCanary())
58         bw.remote_write(0, "a"*25)
59         bw.remote_write(25, "b"*25)
60         bw.remote_write(50, "c"*7) # last block may be short
61         bw.remote_close()
62
63         # now read from it
64         br = BucketReader(self, bw.finalhome)
65         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
66         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
67         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
68
69 class RemoteBucket:
70
71     def callRemote(self, methname, *args, **kwargs):
72         def _call():
73             meth = getattr(self.target, "remote_" + methname)
74             return meth(*args, **kwargs)
75         return defer.maybeDeferred(_call)
76
77 class BucketProxy(unittest.TestCase):
78     def make_bucket(self, name, size):
79         basedir = os.path.join("storage", "BucketProxy", name)
80         incoming = os.path.join(basedir, "tmp", "bucket")
81         final = os.path.join(basedir, "bucket")
82         fileutil.make_dirs(basedir)
83         fileutil.make_dirs(os.path.join(basedir, "tmp"))
84         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
85                           FakeCanary())
86         rb = RemoteBucket()
87         rb.target = bw
88         return bw, rb, final
89
90     def make_lease(self):
91         owner_num = 0
92         renew_secret = os.urandom(32)
93         cancel_secret = os.urandom(32)
94         expiration_time = time.time() + 5000
95         return (owner_num, renew_secret, cancel_secret, expiration_time)
96
97     def bucket_writer_closed(self, bw, consumed):
98         pass
99     def add_latency(self, category, latency):
100         pass
101     def count(self, name, delta=1):
102         pass
103
104     def test_create(self):
105         bw, rb, sharefname = self.make_bucket("test_create", 500)
106         bp = WriteBucketProxy(rb,
107                               data_size=300,
108                               segment_size=10,
109                               num_segments=5,
110                               num_share_hashes=3,
111                               uri_extension_size=500, nodeid=None)
112         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
113
114     def test_readwrite(self):
115         # Let's pretend each share has 100 bytes of data, and that there are
116         # 4 segments (25 bytes each), and 8 shares total. So the three
117         # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
118         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
119         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
120         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
121         # long. That should make the whole share:
122         #
123         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
124
125         plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
126                             for i in range(7)]
127         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
128                             for i in range(7)]
129         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
130                         for i in range(7)]
131         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
132                         for i in (1,9,13)]
133         uri_extension = "s" + "E"*498 + "e"
134
135         bw, rb, sharefname = self.make_bucket("test_readwrite", 1414)
136         bp = WriteBucketProxy(rb,
137                               data_size=95,
138                               segment_size=25,
139                               num_segments=4,
140                               num_share_hashes=3,
141                               uri_extension_size=len(uri_extension),
142                               nodeid=None)
143
144         d = bp.start()
145         d.addCallback(lambda res: bp.put_block(0, "a"*25))
146         d.addCallback(lambda res: bp.put_block(1, "b"*25))
147         d.addCallback(lambda res: bp.put_block(2, "c"*25))
148         d.addCallback(lambda res: bp.put_block(3, "d"*20))
149         d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
150         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
151         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
152         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
153         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
154         d.addCallback(lambda res: bp.close())
155
156         # now read everything back
157         def _start_reading(res):
158             br = BucketReader(self, sharefname)
159             rb = RemoteBucket()
160             rb.target = br
161             rbp = ReadBucketProxy(rb)
162             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
163
164             d1 = rbp.startIfNecessary()
165             d1.addCallback(lambda res: rbp.get_block(0))
166             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
167             d1.addCallback(lambda res: rbp.get_block(1))
168             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
169             d1.addCallback(lambda res: rbp.get_block(2))
170             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
171             d1.addCallback(lambda res: rbp.get_block(3))
172             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
173
174             d1.addCallback(lambda res: rbp.get_plaintext_hashes())
175             d1.addCallback(lambda res:
176                            self.failUnlessEqual(res, plaintext_hashes))
177             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
178             d1.addCallback(lambda res:
179                            self.failUnlessEqual(res, crypttext_hashes))
180             d1.addCallback(lambda res: rbp.get_block_hashes())
181             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
182             d1.addCallback(lambda res: rbp.get_share_hashes())
183             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
184             d1.addCallback(lambda res: rbp.get_uri_extension())
185             d1.addCallback(lambda res:
186                            self.failUnlessEqual(res, uri_extension))
187
188             return d1
189
190         d.addCallback(_start_reading)
191
192         return d
193
194
195
196 class Server(unittest.TestCase):
197
198     def setUp(self):
199         self.sparent = LoggingServiceParent()
200         self._lease_secret = itertools.count()
201     def tearDown(self):
202         return self.sparent.stopService()
203
204     def workdir(self, name):
205         basedir = os.path.join("storage", "Server", name)
206         return basedir
207
208     def create(self, name, sizelimit=None):
209         workdir = self.workdir(name)
210         ss = StorageServer(workdir, sizelimit)
211         ss.setServiceParent(self.sparent)
212         return ss
213
214     def test_create(self):
215         ss = self.create("test_create")
216
217     def allocate(self, ss, storage_index, sharenums, size):
218         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
219         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
220         return ss.remote_allocate_buckets(storage_index,
221                                           renew_secret, cancel_secret,
222                                           sharenums, size, FakeCanary())
223
224     def test_dont_overfill_dirs(self):
225         """
226         This test asserts that if you add a second share whose storage index
227         share lots of leading bits with an extant share (but isn't the exact
228         same storage index), this won't add an entry to the share directory.
229         """
230         ss = self.create("test_dont_overfill_dirs")
231         already, writers = self.allocate(ss, "storageindex", [0], 10)
232         for i, wb in writers.items():
233             wb.remote_write(0, "%10d" % i)
234             wb.remote_close()
235         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
236                                 "shares")
237         children_of_storedir = set(os.listdir(storedir))
238
239         # Now store another one under another storageindex that has leading
240         # chars the same as the first storageindex.
241         already, writers = self.allocate(ss, "storageindey", [0], 10)
242         for i, wb in writers.items():
243             wb.remote_write(0, "%10d" % i)
244             wb.remote_close()
245         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
246                                 "shares")
247         new_children_of_storedir = set(os.listdir(storedir))
248         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
249
250     def test_remove_incoming(self):
251         ss = self.create("test_remove_incoming")
252         already, writers = self.allocate(ss, "vid", range(3), 10)
253         for i,wb in writers.items():
254             wb.remote_write(0, "%10d" % i)
255             wb.remote_close()
256         incomingdir = os.path.dirname(os.path.dirname(os.path.dirname(wb.incominghome)))
257         self.failIf(os.path.exists(incomingdir))
258
259     def test_allocate(self):
260         ss = self.create("test_allocate")
261
262         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
263
264         canary = FakeCanary()
265         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
266         self.failUnlessEqual(already, set())
267         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
268
269         # while the buckets are open, they should not count as readable
270         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
271
272         # close the buckets
273         for i,wb in writers.items():
274             wb.remote_write(0, "%25d" % i)
275             wb.remote_close()
276
277         # now they should be readable
278         b = ss.remote_get_buckets("vid")
279         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
280         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
281
282         # now if we about writing again, the server should offer those three
283         # buckets as already present. It should offer them even if we don't
284         # ask about those specific ones.
285         already,writers = self.allocate(ss, "vid", [2,3,4], 75)
286         self.failUnlessEqual(already, set([0,1,2]))
287         self.failUnlessEqual(set(writers.keys()), set([3,4]))
288
289         # while those two buckets are open for writing, the server should
290         # refuse to offer them to uploaders
291
292         already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
293         self.failUnlessEqual(already, set([0,1,2]))
294         self.failUnlessEqual(set(writers.keys()), set([5]))
295
296     def test_sizelimits(self):
297         ss = self.create("test_sizelimits", 5000)
298         canary = FakeCanary()
299         # a newly created and filled share incurs this much overhead, beyond
300         # the size we request.
301         OVERHEAD = 3*4
302         LEASE_SIZE = 4+32+32+4
303
304         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
305         self.failUnlessEqual(len(writers), 3)
306         # now the StorageServer should have 3000 bytes provisionally
307         # allocated, allowing only 2000 more to be claimed
308         self.failUnlessEqual(len(ss._active_writers), 3)
309
310         # allocating 1001-byte shares only leaves room for one
311         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
312         self.failUnlessEqual(len(writers2), 1)
313         self.failUnlessEqual(len(ss._active_writers), 4)
314
315         # we abandon the first set, so their provisional allocation should be
316         # returned
317         del already
318         del writers
319         self.failUnlessEqual(len(ss._active_writers), 1)
320         # now we have a provisional allocation of 1001 bytes
321
322         # and we close the second set, so their provisional allocation should
323         # become real, long-term allocation, and grows to include the
324         # overhead.
325         for bw in writers2.values():
326             bw.remote_write(0, "a"*25)
327             bw.remote_close()
328         del already2
329         del writers2
330         del bw
331         self.failUnlessEqual(len(ss._active_writers), 0)
332
333         allocated = 1001 + OVERHEAD + LEASE_SIZE
334         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
335         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
336         already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
337         self.failUnlessEqual(len(writers3), 39)
338         self.failUnlessEqual(len(ss._active_writers), 39)
339
340         del already3
341         del writers3
342         self.failUnlessEqual(len(ss._active_writers), 0)
343         ss.disownServiceParent()
344         del ss
345
346         # creating a new StorageServer in the same directory should see the
347         # same usage.
348
349         # metadata that goes into the share file is counted upon share close,
350         # as well as at startup. metadata that goes into other files will not
351         # be counted until the next startup, so if we were creating any
352         # extra-file metadata, the allocation would be more than 'allocated'
353         # and this test would need to be changed.
354         ss = self.create("test_sizelimits", 5000)
355         already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
356         self.failUnlessEqual(len(writers4), 39)
357         self.failUnlessEqual(len(ss._active_writers), 39)
358
359     def test_seek(self):
360         basedir = self.workdir("test_seek_behavior")
361         fileutil.make_dirs(basedir)
362         filename = os.path.join(basedir, "testfile")
363         f = open(filename, "wb")
364         f.write("start")
365         f.close()
366         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
367         # files. mode="a" preserves previous contents but does not allow
368         # seeking-to-create-holes. mode="r+" allows both.
369         f = open(filename, "rb+")
370         f.seek(100)
371         f.write("100")
372         f.close()
373         filelen = os.stat(filename)[stat.ST_SIZE]
374         self.failUnlessEqual(filelen, 100+3)
375         f2 = open(filename, "rb")
376         self.failUnlessEqual(f2.read(5), "start")
377
378
379     def test_leases(self):
380         ss = self.create("test_leases")
381         canary = FakeCanary()
382         sharenums = range(5)
383         size = 100
384
385         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
386                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
387         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
388                                                      sharenums, size, canary)
389         self.failUnlessEqual(len(already), 0)
390         self.failUnlessEqual(len(writers), 5)
391         for wb in writers.values():
392             wb.remote_close()
393
394         leases = list(ss.get_leases("si0"))
395         self.failUnlessEqual(len(leases), 1)
396         self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
397
398         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
399                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
400         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
401                                                      sharenums, size, canary)
402         for wb in writers.values():
403             wb.remote_close()
404
405         # take out a second lease on si1
406         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
407                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
408         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
409                                                      sharenums, size, canary)
410         self.failUnlessEqual(len(already), 5)
411         self.failUnlessEqual(len(writers), 0)
412
413         leases = list(ss.get_leases("si1"))
414         self.failUnlessEqual(len(leases), 2)
415         self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
416
417         # check that si0 is readable
418         readers = ss.remote_get_buckets("si0")
419         self.failUnlessEqual(len(readers), 5)
420
421         # renew the first lease. Only the proper renew_secret should work
422         ss.remote_renew_lease("si0", rs0)
423         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
424         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
425
426         # check that si0 is still readable
427         readers = ss.remote_get_buckets("si0")
428         self.failUnlessEqual(len(readers), 5)
429
430         # now cancel it
431         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
432         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
433         ss.remote_cancel_lease("si0", cs0)
434
435         # si0 should now be gone
436         readers = ss.remote_get_buckets("si0")
437         self.failUnlessEqual(len(readers), 0)
438         # and the renew should no longer work
439         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
440
441
442         # cancel the first lease on si1, leaving the second in place
443         ss.remote_cancel_lease("si1", cs1)
444         readers = ss.remote_get_buckets("si1")
445         self.failUnlessEqual(len(readers), 5)
446         # the corresponding renew should no longer work
447         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
448
449         leases = list(ss.get_leases("si1"))
450         self.failUnlessEqual(len(leases), 1)
451         self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
452
453         ss.remote_renew_lease("si1", rs2)
454         # cancelling the second should make it go away
455         ss.remote_cancel_lease("si1", cs2)
456         readers = ss.remote_get_buckets("si1")
457         self.failUnlessEqual(len(readers), 0)
458         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
459         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
460
461         leases = list(ss.get_leases("si1"))
462         self.failUnlessEqual(len(leases), 0)
463
464
465         # test overlapping uploads
466         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
467                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
468         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
469                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
470         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
471                                                      sharenums, size, canary)
472         self.failUnlessEqual(len(already), 0)
473         self.failUnlessEqual(len(writers), 5)
474         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
475                                                        sharenums, size, canary)
476         self.failUnlessEqual(len(already2), 0)
477         self.failUnlessEqual(len(writers2), 0)
478         for wb in writers.values():
479             wb.remote_close()
480
481         leases = list(ss.get_leases("si3"))
482         self.failUnlessEqual(len(leases), 1)
483
484         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
485                                                        sharenums, size, canary)
486         self.failUnlessEqual(len(already3), 5)
487         self.failUnlessEqual(len(writers3), 0)
488
489         leases = list(ss.get_leases("si3"))
490         self.failUnlessEqual(len(leases), 2)
491
492
493
494 class MutableServer(unittest.TestCase):
495
496     def setUp(self):
497         self.sparent = LoggingServiceParent()
498         self._lease_secret = itertools.count()
499     def tearDown(self):
500         return self.sparent.stopService()
501
502     def workdir(self, name):
503         basedir = os.path.join("storage", "MutableServer", name)
504         return basedir
505
506     def create(self, name, sizelimit=None):
507         workdir = self.workdir(name)
508         ss = StorageServer(workdir, sizelimit)
509         ss.setServiceParent(self.sparent)
510         ss.setNodeID("\x00" * 32)
511         return ss
512
513     def test_create(self):
514         ss = self.create("test_create")
515
516     def write_enabler(self, we_tag):
517         return hashutil.tagged_hash("we_blah", we_tag)
518
519     def renew_secret(self, tag):
520         return hashutil.tagged_hash("renew_blah", str(tag))
521
522     def cancel_secret(self, tag):
523         return hashutil.tagged_hash("cancel_blah", str(tag))
524
525     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
526         write_enabler = self.write_enabler(we_tag)
527         renew_secret = self.renew_secret(lease_tag)
528         cancel_secret = self.cancel_secret(lease_tag)
529         rstaraw = ss.remote_slot_testv_and_readv_and_writev
530         testandwritev = dict( [ (shnum, ([], [], None) )
531                          for shnum in sharenums ] )
532         readv = []
533         rc = rstaraw(storage_index,
534                      (write_enabler, renew_secret, cancel_secret),
535                      testandwritev,
536                      readv)
537         (did_write, readv_data) = rc
538         self.failUnless(did_write)
539         self.failUnless(isinstance(readv_data, dict))
540         self.failUnlessEqual(len(readv_data), 0)
541
542     def test_allocate(self):
543         ss = self.create("test_allocate")
544         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
545                                set([0,1,2]), 100)
546
547         read = ss.remote_slot_readv
548         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
549                              {0: [""]})
550         self.failUnlessEqual(read("si1", [], [(0, 10)]),
551                              {0: [""], 1: [""], 2: [""]})
552         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
553                              {0: [""]})
554
555         # try writing to one
556         secrets = ( self.write_enabler("we1"),
557                     self.renew_secret("we1"),
558                     self.cancel_secret("we1") )
559         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
560         write = ss.remote_slot_testv_and_readv_and_writev
561         answer = write("si1", secrets,
562                        {0: ([], [(0,data)], None)},
563                        [])
564         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
565
566         self.failUnlessEqual(read("si1", [0], [(0,20)]),
567                              {0: ["00000000001111111111"]})
568         self.failUnlessEqual(read("si1", [0], [(95,10)]),
569                              {0: ["99999"]})
570         #self.failUnlessEqual(s0.remote_get_length(), 100)
571
572         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
573         f = self.failUnlessRaises(BadWriteEnablerError,
574                                   write, "si1", bad_secrets,
575                                   {}, [])
576         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
577
578         # this testv should fail
579         answer = write("si1", secrets,
580                        {0: ([(0, 12, "eq", "444444444444"),
581                              (20, 5, "eq", "22222"),
582                              ],
583                             [(0, "x"*100)],
584                             None),
585                         },
586                        [(0,12), (20,5)],
587                        )
588         self.failUnlessEqual(answer, (False,
589                                       {0: ["000000000011", "22222"],
590                                        1: ["", ""],
591                                        2: ["", ""],
592                                        }))
593         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
594
595         # as should this one
596         answer = write("si1", secrets,
597                        {0: ([(10, 5, "lt", "11111"),
598                              ],
599                             [(0, "x"*100)],
600                             None),
601                         },
602                        [(10,5)],
603                        )
604         self.failUnlessEqual(answer, (False,
605                                       {0: ["11111"],
606                                        1: [""],
607                                        2: [""]},
608                                       ))
609         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
610
611
612     def test_operators(self):
613         # test operators, the data we're comparing is '11111' in all cases.
614         # test both fail+pass, reset data after each one.
615         ss = self.create("test_operators")
616
617         secrets = ( self.write_enabler("we1"),
618                     self.renew_secret("we1"),
619                     self.cancel_secret("we1") )
620         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
621         write = ss.remote_slot_testv_and_readv_and_writev
622         read = ss.remote_slot_readv
623
624         def reset():
625             write("si1", secrets,
626                   {0: ([], [(0,data)], None)},
627                   [])
628
629         reset()
630
631         #  lt
632         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
633                                              ],
634                                             [(0, "x"*100)],
635                                             None,
636                                             )}, [(10,5)])
637         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
638         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
639         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
640         reset()
641
642         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
643                                              ],
644                                             [(0, "x"*100)],
645                                             None,
646                                             )}, [(10,5)])
647         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
648         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
649         reset()
650
651         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
652                                              ],
653                                             [(0, "y"*100)],
654                                             None,
655                                             )}, [(10,5)])
656         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
657         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
658         reset()
659
660         #  le
661         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
662                                              ],
663                                             [(0, "x"*100)],
664                                             None,
665                                             )}, [(10,5)])
666         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
667         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
668         reset()
669
670         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
671                                              ],
672                                             [(0, "y"*100)],
673                                             None,
674                                             )}, [(10,5)])
675         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
676         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
677         reset()
678
679         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
680                                              ],
681                                             [(0, "y"*100)],
682                                             None,
683                                             )}, [(10,5)])
684         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
685         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
686         reset()
687
688         #  eq
689         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
690                                              ],
691                                             [(0, "x"*100)],
692                                             None,
693                                             )}, [(10,5)])
694         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
695         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
696         reset()
697
698         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
699                                              ],
700                                             [(0, "y"*100)],
701                                             None,
702                                             )}, [(10,5)])
703         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
704         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
705         reset()
706
707         #  ne
708         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
709                                              ],
710                                             [(0, "x"*100)],
711                                             None,
712                                             )}, [(10,5)])
713         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
714         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
715         reset()
716
717         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
718                                              ],
719                                             [(0, "y"*100)],
720                                             None,
721                                             )}, [(10,5)])
722         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
723         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
724         reset()
725
726         #  ge
727         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
728                                              ],
729                                             [(0, "y"*100)],
730                                             None,
731                                             )}, [(10,5)])
732         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
733         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
734         reset()
735
736         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
737                                              ],
738                                             [(0, "y"*100)],
739                                             None,
740                                             )}, [(10,5)])
741         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
742         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
743         reset()
744
745         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
746                                              ],
747                                             [(0, "y"*100)],
748                                             None,
749                                             )}, [(10,5)])
750         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
751         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
752         reset()
753
754         #  gt
755         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
756                                              ],
757                                             [(0, "y"*100)],
758                                             None,
759                                             )}, [(10,5)])
760         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
761         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
762         reset()
763
764         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
765                                              ],
766                                             [(0, "x"*100)],
767                                             None,
768                                             )}, [(10,5)])
769         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
770         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
771         reset()
772
773         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
774                                              ],
775                                             [(0, "x"*100)],
776                                             None,
777                                             )}, [(10,5)])
778         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
779         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
780         reset()
781
782     def test_readv(self):
783         ss = self.create("test_readv")
784         secrets = ( self.write_enabler("we1"),
785                     self.renew_secret("we1"),
786                     self.cancel_secret("we1") )
787         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
788         write = ss.remote_slot_testv_and_readv_and_writev
789         read = ss.remote_slot_readv
790         data = [("%d" % i) * 100 for i in range(3)]
791         rc = write("si1", secrets,
792                    {0: ([], [(0,data[0])], None),
793                     1: ([], [(0,data[1])], None),
794                     2: ([], [(0,data[2])], None),
795                     }, [])
796         self.failUnlessEqual(rc, (True, {}))
797
798         answer = read("si1", [], [(0, 10)])
799         self.failUnlessEqual(answer, {0: ["0"*10],
800                                       1: ["1"*10],
801                                       2: ["2"*10]})
802
803     def compare_leases_without_timestamps(self, a, b):
804         self.failUnlessEqual(len(a), len(b))
805         for i in range(len(a)):
806             (num_a, (ownerid_a, expiration_time_a,
807                    renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
808             (num_b, (ownerid_b, expiration_time_b,
809                    renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
810             self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
811                                    cancel_secret_a, nodeid_a),
812                                   (num_b, ownerid_b, renew_secret_b,
813                                    cancel_secret_b, nodeid_b) )
814
815     def test_leases(self):
816         ss = self.create("test_leases")
817         def secrets(n):
818             return ( self.write_enabler("we1"),
819                      self.renew_secret("we1-%d" % n),
820                      self.cancel_secret("we1-%d" % n) )
821         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
822         write = ss.remote_slot_testv_and_readv_and_writev
823         read = ss.remote_slot_readv
824         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
825         self.failUnlessEqual(rc, (True, {}))
826
827         # create a random non-numeric file in the bucket directory, to
828         # exercise the code that's supposed to ignore those.
829         bucket_dir = os.path.join(self.workdir("test_leases"),
830                                   "shares", storage_index_to_dir("si1"))
831         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
832         f.write("you ought to be ignoring me\n")
833         f.close()
834
835         # re-allocate the slots and use the same secrets, that should update
836         # the lease
837         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
838
839         # renew it directly
840         ss.remote_renew_lease("si1", secrets(0)[1])
841
842         # now allocate them with a bunch of different secrets, to trigger the
843         # extended lease code
844         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
845         write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
846         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
847         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
848         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
849
850         # cancel one of them
851         ss.remote_cancel_lease("si1", secrets(5)[2])
852
853         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
854         all_leases = s0.debug_get_leases()
855         self.failUnlessEqual(len(all_leases), 5)
856
857         # and write enough data to expand the container, forcing the server
858         # to move the leases
859         write("si1", secrets(0),
860               {0: ([], [(0,data)], 200), },
861               [])
862
863         # read back the leases, make sure they're still intact.
864         self.compare_leases_without_timestamps(all_leases,
865                                                s0.debug_get_leases())
866
867         ss.remote_renew_lease("si1", secrets(0)[1])
868         ss.remote_renew_lease("si1", secrets(1)[1])
869         ss.remote_renew_lease("si1", secrets(2)[1])
870         ss.remote_renew_lease("si1", secrets(3)[1])
871         ss.remote_renew_lease("si1", secrets(4)[1])
872         self.compare_leases_without_timestamps(all_leases,
873                                                s0.debug_get_leases())
874         # get a new copy of the leases, with the current timestamps. Reading
875         # data and failing to renew/cancel leases should leave the timestamps
876         # alone.
877         all_leases = s0.debug_get_leases()
878         # renewing with a bogus token should prompt an error message
879
880         # TODO: examine the exception thus raised, make sure the old nodeid
881         # is present, to provide for share migration
882         self.failUnlessRaises(IndexError,
883                               ss.remote_renew_lease, "si1",
884                               secrets(20)[1])
885         # same for cancelling
886         self.failUnlessRaises(IndexError,
887                               ss.remote_cancel_lease, "si1",
888                               secrets(20)[2])
889         self.failUnlessEqual(all_leases, s0.debug_get_leases())
890
891         # reading shares should not modify the timestamp
892         read("si1", [], [(0,200)])
893         self.failUnlessEqual(all_leases, s0.debug_get_leases())
894
895         write("si1", secrets(0),
896               {0: ([], [(200, "make me bigger")], None)}, [])
897         self.compare_leases_without_timestamps(all_leases,
898                                                s0.debug_get_leases())
899
900         write("si1", secrets(0),
901               {0: ([], [(500, "make me really bigger")], None)}, [])
902         self.compare_leases_without_timestamps(all_leases,
903                                                s0.debug_get_leases())
904
905         # now cancel them all
906         ss.remote_cancel_lease("si1", secrets(0)[2])
907         ss.remote_cancel_lease("si1", secrets(1)[2])
908         ss.remote_cancel_lease("si1", secrets(2)[2])
909         ss.remote_cancel_lease("si1", secrets(3)[2])
910
911         # the slot should still be there
912         remaining_shares = read("si1", [], [(0,10)])
913         self.failUnlessEqual(len(remaining_shares), 1)
914         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
915
916         ss.remote_cancel_lease("si1", secrets(4)[2])
917         # now the slot should be gone
918         no_shares = read("si1", [], [(0,10)])
919         self.failUnlessEqual(no_shares, {})
920
921 class Stats(unittest.TestCase):
922
923     def setUp(self):
924         self.sparent = LoggingServiceParent()
925         self._lease_secret = itertools.count()
926     def tearDown(self):
927         return self.sparent.stopService()
928
929     def workdir(self, name):
930         basedir = os.path.join("storage", "Server", name)
931         return basedir
932
933     def create(self, name, sizelimit=None):
934         workdir = self.workdir(name)
935         ss = StorageServer(workdir, sizelimit)
936         ss.setServiceParent(self.sparent)
937         return ss
938
939     def test_latencies(self):
940         ss = self.create("test_latencies")
941         for i in range(10000):
942             ss.add_latency("allocate", 1.0 * i)
943         for i in range(1000):
944             ss.add_latency("renew", 1.0 * i)
945         for i in range(10):
946             ss.add_latency("cancel", 2.0 * i)
947         ss.add_latency("get", 5.0)
948
949         output = ss.get_latencies()
950
951         self.failUnlessEqual(sorted(output.keys()),
952                              sorted(["allocate", "renew", "cancel", "get"]))
953         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
954         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
955         self.failUnless(abs(output["allocate"]["median"] - 9500) < 1)
956         self.failUnless(abs(output["allocate"]["90_percentile"] - 9900) < 1)
957         self.failUnless(abs(output["allocate"]["95_percentile"] - 9950) < 1)
958         self.failUnless(abs(output["allocate"]["99_percentile"] - 9990) < 1)
959         self.failUnless(abs(output["allocate"]["999_percentile"] - 9999) < 1)
960
961         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
962         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
963         self.failUnless(abs(output["renew"]["median"] - 500) < 1)
964         self.failUnless(abs(output["renew"]["90_percentile"] - 900) < 1)
965         self.failUnless(abs(output["renew"]["95_percentile"] - 950) < 1)
966         self.failUnless(abs(output["renew"]["99_percentile"] - 990) < 1)
967         self.failUnless(abs(output["renew"]["999_percentile"] - 999) < 1)
968
969         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
970         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
971         self.failUnless(abs(output["cancel"]["median"] - 10) < 1)
972         self.failUnless(abs(output["cancel"]["90_percentile"] - 18) < 1)
973         self.failUnless(abs(output["cancel"]["95_percentile"] - 18) < 1)
974         self.failUnless(abs(output["cancel"]["99_percentile"] - 18) < 1)
975         self.failUnless(abs(output["cancel"]["999_percentile"] - 18) < 1)
976
977         self.failUnlessEqual(len(ss.latencies["get"]), 1)
978         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
979         self.failUnless(abs(output["get"]["median"] - 5) < 1)
980         self.failUnless(abs(output["get"]["90_percentile"] - 5) < 1)
981         self.failUnless(abs(output["get"]["95_percentile"] - 5) < 1)
982         self.failUnless(abs(output["get"]["99_percentile"] - 5) < 1)
983         self.failUnless(abs(output["get"]["999_percentile"] - 5) < 1)