]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
storage: measure latency-per-operation, calculate mean/median/percentiles
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 from twisted.trial import unittest
3
4 from twisted.internet import defer
5 import time, os.path, stat
6 import itertools
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil
9 from allmydata.storage import BucketWriter, BucketReader, \
10      WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
11      storage_index_to_dir
12 from allmydata.interfaces import BadWriteEnablerError
13 from allmydata.test.common import LoggingServiceParent
14
15 class FakeCanary:
16     def notifyOnDisconnect(self, *args, **kwargs):
17         pass
18     def dontNotifyOnDisconnect(self, marker):
19         pass
20
21 class Bucket(unittest.TestCase):
22     def make_workdir(self, name):
23         basedir = os.path.join("storage", "Bucket", name)
24         incoming = os.path.join(basedir, "tmp", "bucket")
25         final = os.path.join(basedir, "bucket")
26         fileutil.make_dirs(basedir)
27         fileutil.make_dirs(os.path.join(basedir, "tmp"))
28         return incoming, final
29
30     def bucket_writer_closed(self, bw, consumed):
31         pass
32     def add_latency(self, category, latency):
33         pass
34
35     def make_lease(self):
36         owner_num = 0
37         renew_secret = os.urandom(32)
38         cancel_secret = os.urandom(32)
39         expiration_time = time.time() + 5000
40         return (owner_num, renew_secret, cancel_secret, expiration_time)
41
42     def test_create(self):
43         incoming, final = self.make_workdir("test_create")
44         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
45                           FakeCanary())
46         bw.remote_write(0, "a"*25)
47         bw.remote_write(25, "b"*25)
48         bw.remote_write(50, "c"*25)
49         bw.remote_write(75, "d"*7)
50         bw.remote_close()
51
52     def test_readwrite(self):
53         incoming, final = self.make_workdir("test_readwrite")
54         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
55                           FakeCanary())
56         bw.remote_write(0, "a"*25)
57         bw.remote_write(25, "b"*25)
58         bw.remote_write(50, "c"*7) # last block may be short
59         bw.remote_close()
60
61         # now read from it
62         br = BucketReader(self, bw.finalhome)
63         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
64         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
65         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
66
67 class RemoteBucket:
68
69     def callRemote(self, methname, *args, **kwargs):
70         def _call():
71             meth = getattr(self.target, "remote_" + methname)
72             return meth(*args, **kwargs)
73         return defer.maybeDeferred(_call)
74
75 class BucketProxy(unittest.TestCase):
76     def make_bucket(self, name, size):
77         basedir = os.path.join("storage", "BucketProxy", name)
78         incoming = os.path.join(basedir, "tmp", "bucket")
79         final = os.path.join(basedir, "bucket")
80         fileutil.make_dirs(basedir)
81         fileutil.make_dirs(os.path.join(basedir, "tmp"))
82         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
83                           FakeCanary())
84         rb = RemoteBucket()
85         rb.target = bw
86         return bw, rb, final
87
88     def make_lease(self):
89         owner_num = 0
90         renew_secret = os.urandom(32)
91         cancel_secret = os.urandom(32)
92         expiration_time = time.time() + 5000
93         return (owner_num, renew_secret, cancel_secret, expiration_time)
94
95     def bucket_writer_closed(self, bw, consumed):
96         pass
97     def add_latency(self, category, latency):
98         pass
99
100     def test_create(self):
101         bw, rb, sharefname = self.make_bucket("test_create", 500)
102         bp = WriteBucketProxy(rb,
103                               data_size=300,
104                               segment_size=10,
105                               num_segments=5,
106                               num_share_hashes=3,
107                               uri_extension_size=500, nodeid=None)
108         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
109
110     def test_readwrite(self):
111         # Let's pretend each share has 100 bytes of data, and that there are
112         # 4 segments (25 bytes each), and 8 shares total. So the three
113         # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
114         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
115         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
116         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
117         # long. That should make the whole share:
118         #
119         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
120
121         plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
122                             for i in range(7)]
123         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
124                             for i in range(7)]
125         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
126                         for i in range(7)]
127         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
128                         for i in (1,9,13)]
129         uri_extension = "s" + "E"*498 + "e"
130
131         bw, rb, sharefname = self.make_bucket("test_readwrite", 1414)
132         bp = WriteBucketProxy(rb,
133                               data_size=95,
134                               segment_size=25,
135                               num_segments=4,
136                               num_share_hashes=3,
137                               uri_extension_size=len(uri_extension),
138                               nodeid=None)
139
140         d = bp.start()
141         d.addCallback(lambda res: bp.put_block(0, "a"*25))
142         d.addCallback(lambda res: bp.put_block(1, "b"*25))
143         d.addCallback(lambda res: bp.put_block(2, "c"*25))
144         d.addCallback(lambda res: bp.put_block(3, "d"*20))
145         d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
146         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
147         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
148         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
149         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
150         d.addCallback(lambda res: bp.close())
151
152         # now read everything back
153         def _start_reading(res):
154             br = BucketReader(self, sharefname)
155             rb = RemoteBucket()
156             rb.target = br
157             rbp = ReadBucketProxy(rb)
158             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
159
160             d1 = rbp.startIfNecessary()
161             d1.addCallback(lambda res: rbp.get_block(0))
162             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
163             d1.addCallback(lambda res: rbp.get_block(1))
164             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
165             d1.addCallback(lambda res: rbp.get_block(2))
166             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
167             d1.addCallback(lambda res: rbp.get_block(3))
168             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
169
170             d1.addCallback(lambda res: rbp.get_plaintext_hashes())
171             d1.addCallback(lambda res:
172                            self.failUnlessEqual(res, plaintext_hashes))
173             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
174             d1.addCallback(lambda res:
175                            self.failUnlessEqual(res, crypttext_hashes))
176             d1.addCallback(lambda res: rbp.get_block_hashes())
177             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
178             d1.addCallback(lambda res: rbp.get_share_hashes())
179             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
180             d1.addCallback(lambda res: rbp.get_uri_extension())
181             d1.addCallback(lambda res:
182                            self.failUnlessEqual(res, uri_extension))
183
184             return d1
185
186         d.addCallback(_start_reading)
187
188         return d
189
190
191
192 class Server(unittest.TestCase):
193
194     def setUp(self):
195         self.sparent = LoggingServiceParent()
196         self._lease_secret = itertools.count()
197     def tearDown(self):
198         return self.sparent.stopService()
199
200     def workdir(self, name):
201         basedir = os.path.join("storage", "Server", name)
202         return basedir
203
204     def create(self, name, sizelimit=None):
205         workdir = self.workdir(name)
206         ss = StorageServer(workdir, sizelimit)
207         ss.setServiceParent(self.sparent)
208         return ss
209
210     def test_create(self):
211         ss = self.create("test_create")
212
213     def allocate(self, ss, storage_index, sharenums, size):
214         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
215         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
216         return ss.remote_allocate_buckets(storage_index,
217                                           renew_secret, cancel_secret,
218                                           sharenums, size, FakeCanary())
219
220     def test_dont_overfill_dirs(self):
221         """
222         This test asserts that if you add a second share whose storage index
223         share lots of leading bits with an extant share (but isn't the exact
224         same storage index), this won't add an entry to the share directory.
225         """
226         ss = self.create("test_dont_overfill_dirs")
227         already, writers = self.allocate(ss, "storageindex", [0], 10)
228         for i, wb in writers.items():
229             wb.remote_write(0, "%10d" % i)
230             wb.remote_close()
231         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
232                                 "shares")
233         children_of_storedir = set(os.listdir(storedir))
234
235         # Now store another one under another storageindex that has leading
236         # chars the same as the first storageindex.
237         already, writers = self.allocate(ss, "storageindey", [0], 10)
238         for i, wb in writers.items():
239             wb.remote_write(0, "%10d" % i)
240             wb.remote_close()
241         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
242                                 "shares")
243         new_children_of_storedir = set(os.listdir(storedir))
244         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
245
246     def test_remove_incoming(self):
247         ss = self.create("test_remove_incoming")
248         already, writers = self.allocate(ss, "vid", range(3), 10)
249         for i,wb in writers.items():
250             wb.remote_write(0, "%10d" % i)
251             wb.remote_close()
252         incomingdir = os.path.dirname(os.path.dirname(os.path.dirname(wb.incominghome)))
253         self.failIf(os.path.exists(incomingdir))
254
255     def test_allocate(self):
256         ss = self.create("test_allocate")
257
258         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
259
260         canary = FakeCanary()
261         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
262         self.failUnlessEqual(already, set())
263         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
264
265         # while the buckets are open, they should not count as readable
266         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
267
268         # close the buckets
269         for i,wb in writers.items():
270             wb.remote_write(0, "%25d" % i)
271             wb.remote_close()
272
273         # now they should be readable
274         b = ss.remote_get_buckets("vid")
275         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
276         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
277
278         # now if we about writing again, the server should offer those three
279         # buckets as already present. It should offer them even if we don't
280         # ask about those specific ones.
281         already,writers = self.allocate(ss, "vid", [2,3,4], 75)
282         self.failUnlessEqual(already, set([0,1,2]))
283         self.failUnlessEqual(set(writers.keys()), set([3,4]))
284
285         # while those two buckets are open for writing, the server should
286         # refuse to offer them to uploaders
287
288         already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
289         self.failUnlessEqual(already, set([0,1,2]))
290         self.failUnlessEqual(set(writers.keys()), set([5]))
291
292     def test_sizelimits(self):
293         ss = self.create("test_sizelimits", 5000)
294         canary = FakeCanary()
295         # a newly created and filled share incurs this much overhead, beyond
296         # the size we request.
297         OVERHEAD = 3*4
298         LEASE_SIZE = 4+32+32+4
299
300         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
301         self.failUnlessEqual(len(writers), 3)
302         # now the StorageServer should have 3000 bytes provisionally
303         # allocated, allowing only 2000 more to be claimed
304         self.failUnlessEqual(len(ss._active_writers), 3)
305
306         # allocating 1001-byte shares only leaves room for one
307         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
308         self.failUnlessEqual(len(writers2), 1)
309         self.failUnlessEqual(len(ss._active_writers), 4)
310
311         # we abandon the first set, so their provisional allocation should be
312         # returned
313         del already
314         del writers
315         self.failUnlessEqual(len(ss._active_writers), 1)
316         # now we have a provisional allocation of 1001 bytes
317
318         # and we close the second set, so their provisional allocation should
319         # become real, long-term allocation, and grows to include the
320         # overhead.
321         for bw in writers2.values():
322             bw.remote_write(0, "a"*25)
323             bw.remote_close()
324         del already2
325         del writers2
326         del bw
327         self.failUnlessEqual(len(ss._active_writers), 0)
328
329         allocated = 1001 + OVERHEAD + LEASE_SIZE
330         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
331         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
332         already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
333         self.failUnlessEqual(len(writers3), 39)
334         self.failUnlessEqual(len(ss._active_writers), 39)
335
336         del already3
337         del writers3
338         self.failUnlessEqual(len(ss._active_writers), 0)
339         ss.disownServiceParent()
340         del ss
341
342         # creating a new StorageServer in the same directory should see the
343         # same usage.
344
345         # metadata that goes into the share file is counted upon share close,
346         # as well as at startup. metadata that goes into other files will not
347         # be counted until the next startup, so if we were creating any
348         # extra-file metadata, the allocation would be more than 'allocated'
349         # and this test would need to be changed.
350         ss = self.create("test_sizelimits", 5000)
351         already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
352         self.failUnlessEqual(len(writers4), 39)
353         self.failUnlessEqual(len(ss._active_writers), 39)
354
355     def test_seek(self):
356         basedir = self.workdir("test_seek_behavior")
357         fileutil.make_dirs(basedir)
358         filename = os.path.join(basedir, "testfile")
359         f = open(filename, "wb")
360         f.write("start")
361         f.close()
362         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
363         # files. mode="a" preserves previous contents but does not allow
364         # seeking-to-create-holes. mode="r+" allows both.
365         f = open(filename, "rb+")
366         f.seek(100)
367         f.write("100")
368         f.close()
369         filelen = os.stat(filename)[stat.ST_SIZE]
370         self.failUnlessEqual(filelen, 100+3)
371         f2 = open(filename, "rb")
372         self.failUnlessEqual(f2.read(5), "start")
373
374
375     def test_leases(self):
376         ss = self.create("test_leases")
377         canary = FakeCanary()
378         sharenums = range(5)
379         size = 100
380
381         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
382                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
383         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
384                                                      sharenums, size, canary)
385         self.failUnlessEqual(len(already), 0)
386         self.failUnlessEqual(len(writers), 5)
387         for wb in writers.values():
388             wb.remote_close()
389
390         leases = list(ss.get_leases("si0"))
391         self.failUnlessEqual(len(leases), 1)
392         self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
393
394         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
395                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
396         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
397                                                      sharenums, size, canary)
398         for wb in writers.values():
399             wb.remote_close()
400
401         # take out a second lease on si1
402         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
403                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
404         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
405                                                      sharenums, size, canary)
406         self.failUnlessEqual(len(already), 5)
407         self.failUnlessEqual(len(writers), 0)
408
409         leases = list(ss.get_leases("si1"))
410         self.failUnlessEqual(len(leases), 2)
411         self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
412
413         # check that si0 is readable
414         readers = ss.remote_get_buckets("si0")
415         self.failUnlessEqual(len(readers), 5)
416
417         # renew the first lease. Only the proper renew_secret should work
418         ss.remote_renew_lease("si0", rs0)
419         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
420         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
421
422         # check that si0 is still readable
423         readers = ss.remote_get_buckets("si0")
424         self.failUnlessEqual(len(readers), 5)
425
426         # now cancel it
427         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
428         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
429         ss.remote_cancel_lease("si0", cs0)
430
431         # si0 should now be gone
432         readers = ss.remote_get_buckets("si0")
433         self.failUnlessEqual(len(readers), 0)
434         # and the renew should no longer work
435         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
436
437
438         # cancel the first lease on si1, leaving the second in place
439         ss.remote_cancel_lease("si1", cs1)
440         readers = ss.remote_get_buckets("si1")
441         self.failUnlessEqual(len(readers), 5)
442         # the corresponding renew should no longer work
443         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
444
445         leases = list(ss.get_leases("si1"))
446         self.failUnlessEqual(len(leases), 1)
447         self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
448
449         ss.remote_renew_lease("si1", rs2)
450         # cancelling the second should make it go away
451         ss.remote_cancel_lease("si1", cs2)
452         readers = ss.remote_get_buckets("si1")
453         self.failUnlessEqual(len(readers), 0)
454         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
455         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
456
457         leases = list(ss.get_leases("si1"))
458         self.failUnlessEqual(len(leases), 0)
459
460
461         # test overlapping uploads
462         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
463                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
464         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
465                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
466         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
467                                                      sharenums, size, canary)
468         self.failUnlessEqual(len(already), 0)
469         self.failUnlessEqual(len(writers), 5)
470         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
471                                                        sharenums, size, canary)
472         self.failUnlessEqual(len(already2), 0)
473         self.failUnlessEqual(len(writers2), 0)
474         for wb in writers.values():
475             wb.remote_close()
476
477         leases = list(ss.get_leases("si3"))
478         self.failUnlessEqual(len(leases), 1)
479
480         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
481                                                        sharenums, size, canary)
482         self.failUnlessEqual(len(already3), 5)
483         self.failUnlessEqual(len(writers3), 0)
484
485         leases = list(ss.get_leases("si3"))
486         self.failUnlessEqual(len(leases), 2)
487
488
489
490 class MutableServer(unittest.TestCase):
491
492     def setUp(self):
493         self.sparent = LoggingServiceParent()
494         self._lease_secret = itertools.count()
495     def tearDown(self):
496         return self.sparent.stopService()
497
498     def workdir(self, name):
499         basedir = os.path.join("storage", "MutableServer", name)
500         return basedir
501
502     def create(self, name, sizelimit=None):
503         workdir = self.workdir(name)
504         ss = StorageServer(workdir, sizelimit)
505         ss.setServiceParent(self.sparent)
506         ss.setNodeID("\x00" * 32)
507         return ss
508
509     def test_create(self):
510         ss = self.create("test_create")
511
512     def write_enabler(self, we_tag):
513         return hashutil.tagged_hash("we_blah", we_tag)
514
515     def renew_secret(self, tag):
516         return hashutil.tagged_hash("renew_blah", str(tag))
517
518     def cancel_secret(self, tag):
519         return hashutil.tagged_hash("cancel_blah", str(tag))
520
521     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
522         write_enabler = self.write_enabler(we_tag)
523         renew_secret = self.renew_secret(lease_tag)
524         cancel_secret = self.cancel_secret(lease_tag)
525         rstaraw = ss.remote_slot_testv_and_readv_and_writev
526         testandwritev = dict( [ (shnum, ([], [], None) )
527                          for shnum in sharenums ] )
528         readv = []
529         rc = rstaraw(storage_index,
530                      (write_enabler, renew_secret, cancel_secret),
531                      testandwritev,
532                      readv)
533         (did_write, readv_data) = rc
534         self.failUnless(did_write)
535         self.failUnless(isinstance(readv_data, dict))
536         self.failUnlessEqual(len(readv_data), 0)
537
538     def test_allocate(self):
539         ss = self.create("test_allocate")
540         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
541                                set([0,1,2]), 100)
542
543         read = ss.remote_slot_readv
544         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
545                              {0: [""]})
546         self.failUnlessEqual(read("si1", [], [(0, 10)]),
547                              {0: [""], 1: [""], 2: [""]})
548         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
549                              {0: [""]})
550
551         # try writing to one
552         secrets = ( self.write_enabler("we1"),
553                     self.renew_secret("we1"),
554                     self.cancel_secret("we1") )
555         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
556         write = ss.remote_slot_testv_and_readv_and_writev
557         answer = write("si1", secrets,
558                        {0: ([], [(0,data)], None)},
559                        [])
560         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
561
562         self.failUnlessEqual(read("si1", [0], [(0,20)]),
563                              {0: ["00000000001111111111"]})
564         self.failUnlessEqual(read("si1", [0], [(95,10)]),
565                              {0: ["99999"]})
566         #self.failUnlessEqual(s0.remote_get_length(), 100)
567
568         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
569         f = self.failUnlessRaises(BadWriteEnablerError,
570                                   write, "si1", bad_secrets,
571                                   {}, [])
572         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
573
574         # this testv should fail
575         answer = write("si1", secrets,
576                        {0: ([(0, 12, "eq", "444444444444"),
577                              (20, 5, "eq", "22222"),
578                              ],
579                             [(0, "x"*100)],
580                             None),
581                         },
582                        [(0,12), (20,5)],
583                        )
584         self.failUnlessEqual(answer, (False,
585                                       {0: ["000000000011", "22222"],
586                                        1: ["", ""],
587                                        2: ["", ""],
588                                        }))
589         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
590
591         # as should this one
592         answer = write("si1", secrets,
593                        {0: ([(10, 5, "lt", "11111"),
594                              ],
595                             [(0, "x"*100)],
596                             None),
597                         },
598                        [(10,5)],
599                        )
600         self.failUnlessEqual(answer, (False,
601                                       {0: ["11111"],
602                                        1: [""],
603                                        2: [""]},
604                                       ))
605         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
606
607
608     def test_operators(self):
609         # test operators, the data we're comparing is '11111' in all cases.
610         # test both fail+pass, reset data after each one.
611         ss = self.create("test_operators")
612
613         secrets = ( self.write_enabler("we1"),
614                     self.renew_secret("we1"),
615                     self.cancel_secret("we1") )
616         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
617         write = ss.remote_slot_testv_and_readv_and_writev
618         read = ss.remote_slot_readv
619
620         def reset():
621             write("si1", secrets,
622                   {0: ([], [(0,data)], None)},
623                   [])
624
625         reset()
626
627         #  lt
628         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
629                                              ],
630                                             [(0, "x"*100)],
631                                             None,
632                                             )}, [(10,5)])
633         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
634         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
635         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
636         reset()
637
638         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
639                                              ],
640                                             [(0, "x"*100)],
641                                             None,
642                                             )}, [(10,5)])
643         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
644         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
645         reset()
646
647         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
648                                              ],
649                                             [(0, "y"*100)],
650                                             None,
651                                             )}, [(10,5)])
652         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
653         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
654         reset()
655
656         #  le
657         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
658                                              ],
659                                             [(0, "x"*100)],
660                                             None,
661                                             )}, [(10,5)])
662         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
663         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
664         reset()
665
666         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
667                                              ],
668                                             [(0, "y"*100)],
669                                             None,
670                                             )}, [(10,5)])
671         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
672         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
673         reset()
674
675         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
676                                              ],
677                                             [(0, "y"*100)],
678                                             None,
679                                             )}, [(10,5)])
680         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
681         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
682         reset()
683
684         #  eq
685         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
686                                              ],
687                                             [(0, "x"*100)],
688                                             None,
689                                             )}, [(10,5)])
690         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
691         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
692         reset()
693
694         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
695                                              ],
696                                             [(0, "y"*100)],
697                                             None,
698                                             )}, [(10,5)])
699         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
700         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
701         reset()
702
703         #  ne
704         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
705                                              ],
706                                             [(0, "x"*100)],
707                                             None,
708                                             )}, [(10,5)])
709         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
710         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
711         reset()
712
713         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
714                                              ],
715                                             [(0, "y"*100)],
716                                             None,
717                                             )}, [(10,5)])
718         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
719         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
720         reset()
721
722         #  ge
723         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
724                                              ],
725                                             [(0, "y"*100)],
726                                             None,
727                                             )}, [(10,5)])
728         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
729         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
730         reset()
731
732         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
733                                              ],
734                                             [(0, "y"*100)],
735                                             None,
736                                             )}, [(10,5)])
737         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
738         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
739         reset()
740
741         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
742                                              ],
743                                             [(0, "y"*100)],
744                                             None,
745                                             )}, [(10,5)])
746         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
747         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
748         reset()
749
750         #  gt
751         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
752                                              ],
753                                             [(0, "y"*100)],
754                                             None,
755                                             )}, [(10,5)])
756         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
757         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
758         reset()
759
760         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
761                                              ],
762                                             [(0, "x"*100)],
763                                             None,
764                                             )}, [(10,5)])
765         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
766         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
767         reset()
768
769         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
770                                              ],
771                                             [(0, "x"*100)],
772                                             None,
773                                             )}, [(10,5)])
774         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
775         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
776         reset()
777
778     def test_readv(self):
779         ss = self.create("test_readv")
780         secrets = ( self.write_enabler("we1"),
781                     self.renew_secret("we1"),
782                     self.cancel_secret("we1") )
783         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
784         write = ss.remote_slot_testv_and_readv_and_writev
785         read = ss.remote_slot_readv
786         data = [("%d" % i) * 100 for i in range(3)]
787         rc = write("si1", secrets,
788                    {0: ([], [(0,data[0])], None),
789                     1: ([], [(0,data[1])], None),
790                     2: ([], [(0,data[2])], None),
791                     }, [])
792         self.failUnlessEqual(rc, (True, {}))
793
794         answer = read("si1", [], [(0, 10)])
795         self.failUnlessEqual(answer, {0: ["0"*10],
796                                       1: ["1"*10],
797                                       2: ["2"*10]})
798
799     def compare_leases_without_timestamps(self, a, b):
800         self.failUnlessEqual(len(a), len(b))
801         for i in range(len(a)):
802             (num_a, (ownerid_a, expiration_time_a,
803                    renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
804             (num_b, (ownerid_b, expiration_time_b,
805                    renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
806             self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
807                                    cancel_secret_a, nodeid_a),
808                                   (num_b, ownerid_b, renew_secret_b,
809                                    cancel_secret_b, nodeid_b) )
810
811     def test_leases(self):
812         ss = self.create("test_leases")
813         def secrets(n):
814             return ( self.write_enabler("we1"),
815                      self.renew_secret("we1-%d" % n),
816                      self.cancel_secret("we1-%d" % n) )
817         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
818         write = ss.remote_slot_testv_and_readv_and_writev
819         read = ss.remote_slot_readv
820         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
821         self.failUnlessEqual(rc, (True, {}))
822
823         # create a random non-numeric file in the bucket directory, to
824         # exercise the code that's supposed to ignore those.
825         bucket_dir = os.path.join(self.workdir("test_leases"),
826                                   "shares", storage_index_to_dir("si1"))
827         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
828         f.write("you ought to be ignoring me\n")
829         f.close()
830
831         # re-allocate the slots and use the same secrets, that should update
832         # the lease
833         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
834
835         # renew it directly
836         ss.remote_renew_lease("si1", secrets(0)[1])
837
838         # now allocate them with a bunch of different secrets, to trigger the
839         # extended lease code
840         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
841         write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
842         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
843         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
844         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
845
846         # cancel one of them
847         ss.remote_cancel_lease("si1", secrets(5)[2])
848
849         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
850         all_leases = s0.debug_get_leases()
851         self.failUnlessEqual(len(all_leases), 5)
852
853         # and write enough data to expand the container, forcing the server
854         # to move the leases
855         write("si1", secrets(0),
856               {0: ([], [(0,data)], 200), },
857               [])
858
859         # read back the leases, make sure they're still intact.
860         self.compare_leases_without_timestamps(all_leases,
861                                                s0.debug_get_leases())
862
863         ss.remote_renew_lease("si1", secrets(0)[1])
864         ss.remote_renew_lease("si1", secrets(1)[1])
865         ss.remote_renew_lease("si1", secrets(2)[1])
866         ss.remote_renew_lease("si1", secrets(3)[1])
867         ss.remote_renew_lease("si1", secrets(4)[1])
868         self.compare_leases_without_timestamps(all_leases,
869                                                s0.debug_get_leases())
870         # get a new copy of the leases, with the current timestamps. Reading
871         # data and failing to renew/cancel leases should leave the timestamps
872         # alone.
873         all_leases = s0.debug_get_leases()
874         # renewing with a bogus token should prompt an error message
875
876         # TODO: examine the exception thus raised, make sure the old nodeid
877         # is present, to provide for share migration
878         self.failUnlessRaises(IndexError,
879                               ss.remote_renew_lease, "si1",
880                               secrets(20)[1])
881         # same for cancelling
882         self.failUnlessRaises(IndexError,
883                               ss.remote_cancel_lease, "si1",
884                               secrets(20)[2])
885         self.failUnlessEqual(all_leases, s0.debug_get_leases())
886
887         # reading shares should not modify the timestamp
888         read("si1", [], [(0,200)])
889         self.failUnlessEqual(all_leases, s0.debug_get_leases())
890
891         write("si1", secrets(0),
892               {0: ([], [(200, "make me bigger")], None)}, [])
893         self.compare_leases_without_timestamps(all_leases,
894                                                s0.debug_get_leases())
895
896         write("si1", secrets(0),
897               {0: ([], [(500, "make me really bigger")], None)}, [])
898         self.compare_leases_without_timestamps(all_leases,
899                                                s0.debug_get_leases())
900
901         # now cancel them all
902         ss.remote_cancel_lease("si1", secrets(0)[2])
903         ss.remote_cancel_lease("si1", secrets(1)[2])
904         ss.remote_cancel_lease("si1", secrets(2)[2])
905         ss.remote_cancel_lease("si1", secrets(3)[2])
906
907         # the slot should still be there
908         remaining_shares = read("si1", [], [(0,10)])
909         self.failUnlessEqual(len(remaining_shares), 1)
910         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
911
912         ss.remote_cancel_lease("si1", secrets(4)[2])
913         # now the slot should be gone
914         no_shares = read("si1", [], [(0,10)])
915         self.failUnlessEqual(no_shares, {})
916
917 class Stats(unittest.TestCase):
918
919     def setUp(self):
920         self.sparent = LoggingServiceParent()
921         self._lease_secret = itertools.count()
922     def tearDown(self):
923         return self.sparent.stopService()
924
925     def workdir(self, name):
926         basedir = os.path.join("storage", "Server", name)
927         return basedir
928
929     def create(self, name, sizelimit=None):
930         workdir = self.workdir(name)
931         ss = StorageServer(workdir, sizelimit)
932         ss.setServiceParent(self.sparent)
933         return ss
934
935     def test_latencies(self):
936         ss = self.create("test_latencies")
937         for i in range(10000):
938             ss.add_latency("allocate", 1.0 * i)
939         for i in range(1000):
940             ss.add_latency("renew", 1.0 * i)
941         for i in range(10):
942             ss.add_latency("cancel", 2.0 * i)
943         ss.add_latency("get", 5.0)
944
945         output = ss.get_latencies()
946
947         self.failUnlessEqual(sorted(output.keys()),
948                              sorted(["allocate", "renew", "cancel", "get"]))
949         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
950         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
951         self.failUnless(abs(output["allocate"]["median"] - 9500) < 1)
952         self.failUnless(abs(output["allocate"]["90_percentile"] - 9900) < 1)
953         self.failUnless(abs(output["allocate"]["95_percentile"] - 9950) < 1)
954         self.failUnless(abs(output["allocate"]["99_percentile"] - 9990) < 1)
955         self.failUnless(abs(output["allocate"]["999_percentile"] - 9999) < 1)
956
957         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
958         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
959         self.failUnless(abs(output["renew"]["median"] - 500) < 1)
960         self.failUnless(abs(output["renew"]["90_percentile"] - 900) < 1)
961         self.failUnless(abs(output["renew"]["95_percentile"] - 950) < 1)
962         self.failUnless(abs(output["renew"]["99_percentile"] - 990) < 1)
963         self.failUnless(abs(output["renew"]["999_percentile"] - 999) < 1)
964
965         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
966         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
967         self.failUnless(abs(output["cancel"]["median"] - 10) < 1)
968         self.failUnless(abs(output["cancel"]["90_percentile"] - 18) < 1)
969         self.failUnless(abs(output["cancel"]["95_percentile"] - 18) < 1)
970         self.failUnless(abs(output["cancel"]["99_percentile"] - 18) < 1)
971         self.failUnless(abs(output["cancel"]["999_percentile"] - 18) < 1)
972
973         self.failUnlessEqual(len(ss.latencies["get"]), 1)
974         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
975         self.failUnless(abs(output["get"]["median"] - 5) < 1)
976         self.failUnless(abs(output["get"]["90_percentile"] - 5) < 1)
977         self.failUnless(abs(output["get"]["95_percentile"] - 5) < 1)
978         self.failUnless(abs(output["get"]["99_percentile"] - 5) < 1)
979         self.failUnless(abs(output["get"]["999_percentile"] - 5) < 1)