2 from twisted.trial import unittest
4 from twisted.internet import defer
5 import time, os.path, stat
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil
9 from allmydata.storage import BucketWriter, BucketReader, \
10 WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
12 from allmydata.interfaces import BadWriteEnablerError
13 from allmydata.test.common import LoggingServiceParent
16 def notifyOnDisconnect(self, *args, **kwargs):
18 def dontNotifyOnDisconnect(self, marker):
21 class Bucket(unittest.TestCase):
22 def make_workdir(self, name):
23 basedir = os.path.join("storage", "Bucket", name)
24 incoming = os.path.join(basedir, "tmp", "bucket")
25 final = os.path.join(basedir, "bucket")
26 fileutil.make_dirs(basedir)
27 fileutil.make_dirs(os.path.join(basedir, "tmp"))
28 return incoming, final
30 def bucket_writer_closed(self, bw, consumed):
32 def add_latency(self, category, latency):
34 def count(self, name, delta=1):
39 renew_secret = os.urandom(32)
40 cancel_secret = os.urandom(32)
41 expiration_time = time.time() + 5000
42 return (owner_num, renew_secret, cancel_secret, expiration_time)
44 def test_create(self):
45 incoming, final = self.make_workdir("test_create")
46 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
48 bw.remote_write(0, "a"*25)
49 bw.remote_write(25, "b"*25)
50 bw.remote_write(50, "c"*25)
51 bw.remote_write(75, "d"*7)
54 def test_readwrite(self):
55 incoming, final = self.make_workdir("test_readwrite")
56 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
58 bw.remote_write(0, "a"*25)
59 bw.remote_write(25, "b"*25)
60 bw.remote_write(50, "c"*7) # last block may be short
64 br = BucketReader(self, bw.finalhome)
65 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
66 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
67 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
71 def callRemote(self, methname, *args, **kwargs):
73 meth = getattr(self.target, "remote_" + methname)
74 return meth(*args, **kwargs)
75 return defer.maybeDeferred(_call)
77 class BucketProxy(unittest.TestCase):
78 def make_bucket(self, name, size):
79 basedir = os.path.join("storage", "BucketProxy", name)
80 incoming = os.path.join(basedir, "tmp", "bucket")
81 final = os.path.join(basedir, "bucket")
82 fileutil.make_dirs(basedir)
83 fileutil.make_dirs(os.path.join(basedir, "tmp"))
84 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
92 renew_secret = os.urandom(32)
93 cancel_secret = os.urandom(32)
94 expiration_time = time.time() + 5000
95 return (owner_num, renew_secret, cancel_secret, expiration_time)
97 def bucket_writer_closed(self, bw, consumed):
99 def add_latency(self, category, latency):
101 def count(self, name, delta=1):
104 def test_create(self):
105 bw, rb, sharefname = self.make_bucket("test_create", 500)
106 bp = WriteBucketProxy(rb,
111 uri_extension_size=500, nodeid=None)
112 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
114 def test_readwrite(self):
115 # Let's pretend each share has 100 bytes of data, and that there are
116 # 4 segments (25 bytes each), and 8 shares total. So the three
117 # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
118 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
119 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
120 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
121 # long. That should make the whole share:
123 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
125 plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
127 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
129 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
131 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
133 uri_extension = "s" + "E"*498 + "e"
135 bw, rb, sharefname = self.make_bucket("test_readwrite", 1414)
136 bp = WriteBucketProxy(rb,
141 uri_extension_size=len(uri_extension),
145 d.addCallback(lambda res: bp.put_block(0, "a"*25))
146 d.addCallback(lambda res: bp.put_block(1, "b"*25))
147 d.addCallback(lambda res: bp.put_block(2, "c"*25))
148 d.addCallback(lambda res: bp.put_block(3, "d"*20))
149 d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
150 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
151 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
152 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
153 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
154 d.addCallback(lambda res: bp.close())
156 # now read everything back
157 def _start_reading(res):
158 br = BucketReader(self, sharefname)
161 rbp = ReadBucketProxy(rb)
162 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
164 d1 = rbp.startIfNecessary()
165 d1.addCallback(lambda res: rbp.get_block(0))
166 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
167 d1.addCallback(lambda res: rbp.get_block(1))
168 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
169 d1.addCallback(lambda res: rbp.get_block(2))
170 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
171 d1.addCallback(lambda res: rbp.get_block(3))
172 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
174 d1.addCallback(lambda res: rbp.get_plaintext_hashes())
175 d1.addCallback(lambda res:
176 self.failUnlessEqual(res, plaintext_hashes))
177 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
178 d1.addCallback(lambda res:
179 self.failUnlessEqual(res, crypttext_hashes))
180 d1.addCallback(lambda res: rbp.get_block_hashes())
181 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
182 d1.addCallback(lambda res: rbp.get_share_hashes())
183 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
184 d1.addCallback(lambda res: rbp.get_uri_extension())
185 d1.addCallback(lambda res:
186 self.failUnlessEqual(res, uri_extension))
190 d.addCallback(_start_reading)
196 class Server(unittest.TestCase):
199 self.sparent = LoggingServiceParent()
200 self._lease_secret = itertools.count()
202 return self.sparent.stopService()
204 def workdir(self, name):
205 basedir = os.path.join("storage", "Server", name)
208 def create(self, name, sizelimit=None):
209 workdir = self.workdir(name)
210 ss = StorageServer(workdir, sizelimit)
211 ss.setServiceParent(self.sparent)
214 def test_create(self):
215 ss = self.create("test_create")
217 def allocate(self, ss, storage_index, sharenums, size):
218 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
219 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
220 return ss.remote_allocate_buckets(storage_index,
221 renew_secret, cancel_secret,
222 sharenums, size, FakeCanary())
224 def test_dont_overfill_dirs(self):
226 This test asserts that if you add a second share whose storage index
227 share lots of leading bits with an extant share (but isn't the exact
228 same storage index), this won't add an entry to the share directory.
230 ss = self.create("test_dont_overfill_dirs")
231 already, writers = self.allocate(ss, "storageindex", [0], 10)
232 for i, wb in writers.items():
233 wb.remote_write(0, "%10d" % i)
235 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
237 children_of_storedir = set(os.listdir(storedir))
239 # Now store another one under another storageindex that has leading
240 # chars the same as the first storageindex.
241 already, writers = self.allocate(ss, "storageindey", [0], 10)
242 for i, wb in writers.items():
243 wb.remote_write(0, "%10d" % i)
245 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
247 new_children_of_storedir = set(os.listdir(storedir))
248 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
250 def test_remove_incoming(self):
251 ss = self.create("test_remove_incoming")
252 already, writers = self.allocate(ss, "vid", range(3), 10)
253 for i,wb in writers.items():
254 wb.remote_write(0, "%10d" % i)
256 incomingdir = os.path.dirname(os.path.dirname(os.path.dirname(wb.incominghome)))
257 self.failIf(os.path.exists(incomingdir))
259 def test_allocate(self):
260 ss = self.create("test_allocate")
262 self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
264 canary = FakeCanary()
265 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
266 self.failUnlessEqual(already, set())
267 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
269 # while the buckets are open, they should not count as readable
270 self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
273 for i,wb in writers.items():
274 wb.remote_write(0, "%25d" % i)
277 # now they should be readable
278 b = ss.remote_get_buckets("vid")
279 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
280 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
282 # now if we about writing again, the server should offer those three
283 # buckets as already present. It should offer them even if we don't
284 # ask about those specific ones.
285 already,writers = self.allocate(ss, "vid", [2,3,4], 75)
286 self.failUnlessEqual(already, set([0,1,2]))
287 self.failUnlessEqual(set(writers.keys()), set([3,4]))
289 # while those two buckets are open for writing, the server should
290 # refuse to offer them to uploaders
292 already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
293 self.failUnlessEqual(already, set([0,1,2]))
294 self.failUnlessEqual(set(writers.keys()), set([5]))
296 def test_sizelimits(self):
297 ss = self.create("test_sizelimits", 5000)
298 canary = FakeCanary()
299 # a newly created and filled share incurs this much overhead, beyond
300 # the size we request.
302 LEASE_SIZE = 4+32+32+4
304 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
305 self.failUnlessEqual(len(writers), 3)
306 # now the StorageServer should have 3000 bytes provisionally
307 # allocated, allowing only 2000 more to be claimed
308 self.failUnlessEqual(len(ss._active_writers), 3)
310 # allocating 1001-byte shares only leaves room for one
311 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
312 self.failUnlessEqual(len(writers2), 1)
313 self.failUnlessEqual(len(ss._active_writers), 4)
315 # we abandon the first set, so their provisional allocation should be
319 self.failUnlessEqual(len(ss._active_writers), 1)
320 # now we have a provisional allocation of 1001 bytes
322 # and we close the second set, so their provisional allocation should
323 # become real, long-term allocation, and grows to include the
325 for bw in writers2.values():
326 bw.remote_write(0, "a"*25)
331 self.failUnlessEqual(len(ss._active_writers), 0)
333 allocated = 1001 + OVERHEAD + LEASE_SIZE
334 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
335 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
336 already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
337 self.failUnlessEqual(len(writers3), 39)
338 self.failUnlessEqual(len(ss._active_writers), 39)
342 self.failUnlessEqual(len(ss._active_writers), 0)
343 ss.disownServiceParent()
346 # creating a new StorageServer in the same directory should see the
349 # metadata that goes into the share file is counted upon share close,
350 # as well as at startup. metadata that goes into other files will not
351 # be counted until the next startup, so if we were creating any
352 # extra-file metadata, the allocation would be more than 'allocated'
353 # and this test would need to be changed.
354 ss = self.create("test_sizelimits", 5000)
355 already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
356 self.failUnlessEqual(len(writers4), 39)
357 self.failUnlessEqual(len(ss._active_writers), 39)
360 basedir = self.workdir("test_seek_behavior")
361 fileutil.make_dirs(basedir)
362 filename = os.path.join(basedir, "testfile")
363 f = open(filename, "wb")
366 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
367 # files. mode="a" preserves previous contents but does not allow
368 # seeking-to-create-holes. mode="r+" allows both.
369 f = open(filename, "rb+")
373 filelen = os.stat(filename)[stat.ST_SIZE]
374 self.failUnlessEqual(filelen, 100+3)
375 f2 = open(filename, "rb")
376 self.failUnlessEqual(f2.read(5), "start")
379 def test_leases(self):
380 ss = self.create("test_leases")
381 canary = FakeCanary()
385 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
386 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
387 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
388 sharenums, size, canary)
389 self.failUnlessEqual(len(already), 0)
390 self.failUnlessEqual(len(writers), 5)
391 for wb in writers.values():
394 leases = list(ss.get_leases("si0"))
395 self.failUnlessEqual(len(leases), 1)
396 self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
398 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
399 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
400 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
401 sharenums, size, canary)
402 for wb in writers.values():
405 # take out a second lease on si1
406 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
407 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
408 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
409 sharenums, size, canary)
410 self.failUnlessEqual(len(already), 5)
411 self.failUnlessEqual(len(writers), 0)
413 leases = list(ss.get_leases("si1"))
414 self.failUnlessEqual(len(leases), 2)
415 self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
417 # check that si0 is readable
418 readers = ss.remote_get_buckets("si0")
419 self.failUnlessEqual(len(readers), 5)
421 # renew the first lease. Only the proper renew_secret should work
422 ss.remote_renew_lease("si0", rs0)
423 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
424 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
426 # check that si0 is still readable
427 readers = ss.remote_get_buckets("si0")
428 self.failUnlessEqual(len(readers), 5)
431 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
432 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
433 ss.remote_cancel_lease("si0", cs0)
435 # si0 should now be gone
436 readers = ss.remote_get_buckets("si0")
437 self.failUnlessEqual(len(readers), 0)
438 # and the renew should no longer work
439 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
442 # cancel the first lease on si1, leaving the second in place
443 ss.remote_cancel_lease("si1", cs1)
444 readers = ss.remote_get_buckets("si1")
445 self.failUnlessEqual(len(readers), 5)
446 # the corresponding renew should no longer work
447 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
449 leases = list(ss.get_leases("si1"))
450 self.failUnlessEqual(len(leases), 1)
451 self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
453 ss.remote_renew_lease("si1", rs2)
454 # cancelling the second should make it go away
455 ss.remote_cancel_lease("si1", cs2)
456 readers = ss.remote_get_buckets("si1")
457 self.failUnlessEqual(len(readers), 0)
458 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
459 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
461 leases = list(ss.get_leases("si1"))
462 self.failUnlessEqual(len(leases), 0)
465 # test overlapping uploads
466 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
467 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
468 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
469 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
470 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
471 sharenums, size, canary)
472 self.failUnlessEqual(len(already), 0)
473 self.failUnlessEqual(len(writers), 5)
474 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
475 sharenums, size, canary)
476 self.failUnlessEqual(len(already2), 0)
477 self.failUnlessEqual(len(writers2), 0)
478 for wb in writers.values():
481 leases = list(ss.get_leases("si3"))
482 self.failUnlessEqual(len(leases), 1)
484 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
485 sharenums, size, canary)
486 self.failUnlessEqual(len(already3), 5)
487 self.failUnlessEqual(len(writers3), 0)
489 leases = list(ss.get_leases("si3"))
490 self.failUnlessEqual(len(leases), 2)
492 def test_readonly(self):
493 workdir = self.workdir("test_readonly")
494 ss = StorageServer(workdir, readonly_storage=True)
495 ss.setServiceParent(self.sparent)
497 canary = FakeCanary()
498 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
499 self.failUnlessEqual(already, set())
500 self.failUnlessEqual(writers, {})
504 class MutableServer(unittest.TestCase):
507 self.sparent = LoggingServiceParent()
508 self._lease_secret = itertools.count()
510 return self.sparent.stopService()
512 def workdir(self, name):
513 basedir = os.path.join("storage", "MutableServer", name)
516 def create(self, name, sizelimit=None):
517 workdir = self.workdir(name)
518 ss = StorageServer(workdir, sizelimit)
519 ss.setServiceParent(self.sparent)
520 ss.setNodeID("\x00" * 32)
523 def test_create(self):
524 ss = self.create("test_create")
526 def write_enabler(self, we_tag):
527 return hashutil.tagged_hash("we_blah", we_tag)
529 def renew_secret(self, tag):
530 return hashutil.tagged_hash("renew_blah", str(tag))
532 def cancel_secret(self, tag):
533 return hashutil.tagged_hash("cancel_blah", str(tag))
535 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
536 write_enabler = self.write_enabler(we_tag)
537 renew_secret = self.renew_secret(lease_tag)
538 cancel_secret = self.cancel_secret(lease_tag)
539 rstaraw = ss.remote_slot_testv_and_readv_and_writev
540 testandwritev = dict( [ (shnum, ([], [], None) )
541 for shnum in sharenums ] )
543 rc = rstaraw(storage_index,
544 (write_enabler, renew_secret, cancel_secret),
547 (did_write, readv_data) = rc
548 self.failUnless(did_write)
549 self.failUnless(isinstance(readv_data, dict))
550 self.failUnlessEqual(len(readv_data), 0)
552 def test_allocate(self):
553 ss = self.create("test_allocate")
554 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
557 read = ss.remote_slot_readv
558 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
560 self.failUnlessEqual(read("si1", [], [(0, 10)]),
561 {0: [""], 1: [""], 2: [""]})
562 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
566 secrets = ( self.write_enabler("we1"),
567 self.renew_secret("we1"),
568 self.cancel_secret("we1") )
569 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
570 write = ss.remote_slot_testv_and_readv_and_writev
571 answer = write("si1", secrets,
572 {0: ([], [(0,data)], None)},
574 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
576 self.failUnlessEqual(read("si1", [0], [(0,20)]),
577 {0: ["00000000001111111111"]})
578 self.failUnlessEqual(read("si1", [0], [(95,10)]),
580 #self.failUnlessEqual(s0.remote_get_length(), 100)
582 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
583 f = self.failUnlessRaises(BadWriteEnablerError,
584 write, "si1", bad_secrets,
586 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
588 # this testv should fail
589 answer = write("si1", secrets,
590 {0: ([(0, 12, "eq", "444444444444"),
591 (20, 5, "eq", "22222"),
598 self.failUnlessEqual(answer, (False,
599 {0: ["000000000011", "22222"],
603 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
606 answer = write("si1", secrets,
607 {0: ([(10, 5, "lt", "11111"),
614 self.failUnlessEqual(answer, (False,
619 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
622 def test_operators(self):
623 # test operators, the data we're comparing is '11111' in all cases.
624 # test both fail+pass, reset data after each one.
625 ss = self.create("test_operators")
627 secrets = ( self.write_enabler("we1"),
628 self.renew_secret("we1"),
629 self.cancel_secret("we1") )
630 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
631 write = ss.remote_slot_testv_and_readv_and_writev
632 read = ss.remote_slot_readv
635 write("si1", secrets,
636 {0: ([], [(0,data)], None)},
642 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
647 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
648 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
649 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
652 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
657 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
658 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
661 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
666 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
667 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
671 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
676 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
677 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
680 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
685 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
686 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
689 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
694 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
695 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
699 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
704 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
705 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
708 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
713 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
714 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
718 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
723 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
724 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
727 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
732 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
733 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
737 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
742 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
743 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
746 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
751 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
752 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
755 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
760 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
761 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
765 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
770 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
771 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
774 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
779 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
780 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
783 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
788 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
789 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
792 def test_readv(self):
793 ss = self.create("test_readv")
794 secrets = ( self.write_enabler("we1"),
795 self.renew_secret("we1"),
796 self.cancel_secret("we1") )
797 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
798 write = ss.remote_slot_testv_and_readv_and_writev
799 read = ss.remote_slot_readv
800 data = [("%d" % i) * 100 for i in range(3)]
801 rc = write("si1", secrets,
802 {0: ([], [(0,data[0])], None),
803 1: ([], [(0,data[1])], None),
804 2: ([], [(0,data[2])], None),
806 self.failUnlessEqual(rc, (True, {}))
808 answer = read("si1", [], [(0, 10)])
809 self.failUnlessEqual(answer, {0: ["0"*10],
813 def compare_leases_without_timestamps(self, a, b):
814 self.failUnlessEqual(len(a), len(b))
815 for i in range(len(a)):
816 (num_a, (ownerid_a, expiration_time_a,
817 renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
818 (num_b, (ownerid_b, expiration_time_b,
819 renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
820 self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
821 cancel_secret_a, nodeid_a),
822 (num_b, ownerid_b, renew_secret_b,
823 cancel_secret_b, nodeid_b) )
825 def test_leases(self):
826 ss = self.create("test_leases")
828 return ( self.write_enabler("we1"),
829 self.renew_secret("we1-%d" % n),
830 self.cancel_secret("we1-%d" % n) )
831 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
832 write = ss.remote_slot_testv_and_readv_and_writev
833 read = ss.remote_slot_readv
834 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
835 self.failUnlessEqual(rc, (True, {}))
837 # create a random non-numeric file in the bucket directory, to
838 # exercise the code that's supposed to ignore those.
839 bucket_dir = os.path.join(self.workdir("test_leases"),
840 "shares", storage_index_to_dir("si1"))
841 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
842 f.write("you ought to be ignoring me\n")
845 # re-allocate the slots and use the same secrets, that should update
847 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
850 ss.remote_renew_lease("si1", secrets(0)[1])
852 # now allocate them with a bunch of different secrets, to trigger the
853 # extended lease code
854 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
855 write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
856 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
857 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
858 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
861 ss.remote_cancel_lease("si1", secrets(5)[2])
863 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
864 all_leases = s0.debug_get_leases()
865 self.failUnlessEqual(len(all_leases), 5)
867 # and write enough data to expand the container, forcing the server
869 write("si1", secrets(0),
870 {0: ([], [(0,data)], 200), },
873 # read back the leases, make sure they're still intact.
874 self.compare_leases_without_timestamps(all_leases,
875 s0.debug_get_leases())
877 ss.remote_renew_lease("si1", secrets(0)[1])
878 ss.remote_renew_lease("si1", secrets(1)[1])
879 ss.remote_renew_lease("si1", secrets(2)[1])
880 ss.remote_renew_lease("si1", secrets(3)[1])
881 ss.remote_renew_lease("si1", secrets(4)[1])
882 self.compare_leases_without_timestamps(all_leases,
883 s0.debug_get_leases())
884 # get a new copy of the leases, with the current timestamps. Reading
885 # data and failing to renew/cancel leases should leave the timestamps
887 all_leases = s0.debug_get_leases()
888 # renewing with a bogus token should prompt an error message
890 # TODO: examine the exception thus raised, make sure the old nodeid
891 # is present, to provide for share migration
892 self.failUnlessRaises(IndexError,
893 ss.remote_renew_lease, "si1",
895 # same for cancelling
896 self.failUnlessRaises(IndexError,
897 ss.remote_cancel_lease, "si1",
899 self.failUnlessEqual(all_leases, s0.debug_get_leases())
901 # reading shares should not modify the timestamp
902 read("si1", [], [(0,200)])
903 self.failUnlessEqual(all_leases, s0.debug_get_leases())
905 write("si1", secrets(0),
906 {0: ([], [(200, "make me bigger")], None)}, [])
907 self.compare_leases_without_timestamps(all_leases,
908 s0.debug_get_leases())
910 write("si1", secrets(0),
911 {0: ([], [(500, "make me really bigger")], None)}, [])
912 self.compare_leases_without_timestamps(all_leases,
913 s0.debug_get_leases())
915 # now cancel them all
916 ss.remote_cancel_lease("si1", secrets(0)[2])
917 ss.remote_cancel_lease("si1", secrets(1)[2])
918 ss.remote_cancel_lease("si1", secrets(2)[2])
919 ss.remote_cancel_lease("si1", secrets(3)[2])
921 # the slot should still be there
922 remaining_shares = read("si1", [], [(0,10)])
923 self.failUnlessEqual(len(remaining_shares), 1)
924 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
926 ss.remote_cancel_lease("si1", secrets(4)[2])
927 # now the slot should be gone
928 no_shares = read("si1", [], [(0,10)])
929 self.failUnlessEqual(no_shares, {})
931 class Stats(unittest.TestCase):
934 self.sparent = LoggingServiceParent()
935 self._lease_secret = itertools.count()
937 return self.sparent.stopService()
939 def workdir(self, name):
940 basedir = os.path.join("storage", "Server", name)
943 def create(self, name, sizelimit=None):
944 workdir = self.workdir(name)
945 ss = StorageServer(workdir, sizelimit)
946 ss.setServiceParent(self.sparent)
949 def test_latencies(self):
950 ss = self.create("test_latencies")
951 for i in range(10000):
952 ss.add_latency("allocate", 1.0 * i)
953 for i in range(1000):
954 ss.add_latency("renew", 1.0 * i)
956 ss.add_latency("cancel", 2.0 * i)
957 ss.add_latency("get", 5.0)
959 output = ss.get_latencies()
961 self.failUnlessEqual(sorted(output.keys()),
962 sorted(["allocate", "renew", "cancel", "get"]))
963 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
964 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
965 self.failUnless(abs(output["allocate"]["median"] - 9500) < 1)
966 self.failUnless(abs(output["allocate"]["90_percentile"] - 9900) < 1)
967 self.failUnless(abs(output["allocate"]["95_percentile"] - 9950) < 1)
968 self.failUnless(abs(output["allocate"]["99_percentile"] - 9990) < 1)
969 self.failUnless(abs(output["allocate"]["999_percentile"] - 9999) < 1)
971 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
972 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
973 self.failUnless(abs(output["renew"]["median"] - 500) < 1)
974 self.failUnless(abs(output["renew"]["90_percentile"] - 900) < 1)
975 self.failUnless(abs(output["renew"]["95_percentile"] - 950) < 1)
976 self.failUnless(abs(output["renew"]["99_percentile"] - 990) < 1)
977 self.failUnless(abs(output["renew"]["999_percentile"] - 999) < 1)
979 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
980 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
981 self.failUnless(abs(output["cancel"]["median"] - 10) < 1)
982 self.failUnless(abs(output["cancel"]["90_percentile"] - 18) < 1)
983 self.failUnless(abs(output["cancel"]["95_percentile"] - 18) < 1)
984 self.failUnless(abs(output["cancel"]["99_percentile"] - 18) < 1)
985 self.failUnless(abs(output["cancel"]["999_percentile"] - 18) < 1)
987 self.failUnlessEqual(len(ss.latencies["get"]), 1)
988 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
989 self.failUnless(abs(output["get"]["median"] - 5) < 1)
990 self.failUnless(abs(output["get"]["90_percentile"] - 5) < 1)
991 self.failUnless(abs(output["get"]["95_percentile"] - 5) < 1)
992 self.failUnless(abs(output["get"]["99_percentile"] - 5) < 1)
993 self.failUnless(abs(output["get"]["999_percentile"] - 5) < 1)