2 from twisted.trial import unittest
4 from twisted.internet import defer
5 import time, os.path, stat
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil
9 from allmydata.storage import BucketWriter, BucketReader, \
10 WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
12 from allmydata.interfaces import BadWriteEnablerError
13 from allmydata.test.common import LoggingServiceParent
16 def notifyOnDisconnect(self, *args, **kwargs):
18 def dontNotifyOnDisconnect(self, marker):
21 class Bucket(unittest.TestCase):
22 def make_workdir(self, name):
23 basedir = os.path.join("storage", "Bucket", name)
24 incoming = os.path.join(basedir, "tmp", "bucket")
25 final = os.path.join(basedir, "bucket")
26 fileutil.make_dirs(basedir)
27 fileutil.make_dirs(os.path.join(basedir, "tmp"))
28 return incoming, final
30 def bucket_writer_closed(self, bw, consumed):
32 def add_latency(self, category, latency):
34 def count(self, name, delta=1):
39 renew_secret = os.urandom(32)
40 cancel_secret = os.urandom(32)
41 expiration_time = time.time() + 5000
42 return (owner_num, renew_secret, cancel_secret, expiration_time)
44 def test_create(self):
45 incoming, final = self.make_workdir("test_create")
46 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
48 bw.remote_write(0, "a"*25)
49 bw.remote_write(25, "b"*25)
50 bw.remote_write(50, "c"*25)
51 bw.remote_write(75, "d"*7)
54 def test_readwrite(self):
55 incoming, final = self.make_workdir("test_readwrite")
56 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
58 bw.remote_write(0, "a"*25)
59 bw.remote_write(25, "b"*25)
60 bw.remote_write(50, "c"*7) # last block may be short
64 br = BucketReader(self, bw.finalhome)
65 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
66 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
67 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
71 def callRemote(self, methname, *args, **kwargs):
73 meth = getattr(self.target, "remote_" + methname)
74 return meth(*args, **kwargs)
75 return defer.maybeDeferred(_call)
77 class BucketProxy(unittest.TestCase):
78 def make_bucket(self, name, size):
79 basedir = os.path.join("storage", "BucketProxy", name)
80 incoming = os.path.join(basedir, "tmp", "bucket")
81 final = os.path.join(basedir, "bucket")
82 fileutil.make_dirs(basedir)
83 fileutil.make_dirs(os.path.join(basedir, "tmp"))
84 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
92 renew_secret = os.urandom(32)
93 cancel_secret = os.urandom(32)
94 expiration_time = time.time() + 5000
95 return (owner_num, renew_secret, cancel_secret, expiration_time)
97 def bucket_writer_closed(self, bw, consumed):
99 def add_latency(self, category, latency):
101 def count(self, name, delta=1):
104 def test_create(self):
105 bw, rb, sharefname = self.make_bucket("test_create", 500)
106 bp = WriteBucketProxy(rb,
111 uri_extension_size=500, nodeid=None)
112 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
114 def test_readwrite(self):
115 # Let's pretend each share has 100 bytes of data, and that there are
116 # 4 segments (25 bytes each), and 8 shares total. So the three
117 # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
118 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
119 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
120 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
121 # long. That should make the whole share:
123 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
125 plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
127 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
129 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
131 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
133 uri_extension = "s" + "E"*498 + "e"
135 bw, rb, sharefname = self.make_bucket("test_readwrite", 1414)
136 bp = WriteBucketProxy(rb,
141 uri_extension_size=len(uri_extension),
145 d.addCallback(lambda res: bp.put_block(0, "a"*25))
146 d.addCallback(lambda res: bp.put_block(1, "b"*25))
147 d.addCallback(lambda res: bp.put_block(2, "c"*25))
148 d.addCallback(lambda res: bp.put_block(3, "d"*20))
149 d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
150 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
151 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
152 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
153 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
154 d.addCallback(lambda res: bp.close())
156 # now read everything back
157 def _start_reading(res):
158 br = BucketReader(self, sharefname)
161 rbp = ReadBucketProxy(rb)
162 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
164 d1 = rbp.startIfNecessary()
165 d1.addCallback(lambda res: rbp.get_block(0))
166 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
167 d1.addCallback(lambda res: rbp.get_block(1))
168 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
169 d1.addCallback(lambda res: rbp.get_block(2))
170 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
171 d1.addCallback(lambda res: rbp.get_block(3))
172 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
174 d1.addCallback(lambda res: rbp.get_plaintext_hashes())
175 d1.addCallback(lambda res:
176 self.failUnlessEqual(res, plaintext_hashes))
177 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
178 d1.addCallback(lambda res:
179 self.failUnlessEqual(res, crypttext_hashes))
180 d1.addCallback(lambda res: rbp.get_block_hashes())
181 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
182 d1.addCallback(lambda res: rbp.get_share_hashes())
183 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
184 d1.addCallback(lambda res: rbp.get_uri_extension())
185 d1.addCallback(lambda res:
186 self.failUnlessEqual(res, uri_extension))
190 d.addCallback(_start_reading)
196 class Server(unittest.TestCase):
199 self.sparent = LoggingServiceParent()
200 self._lease_secret = itertools.count()
202 return self.sparent.stopService()
204 def workdir(self, name):
205 basedir = os.path.join("storage", "Server", name)
208 def create(self, name, sizelimit=None):
209 workdir = self.workdir(name)
210 ss = StorageServer(workdir, sizelimit)
211 ss.setServiceParent(self.sparent)
214 def test_create(self):
215 ss = self.create("test_create")
217 def allocate(self, ss, storage_index, sharenums, size):
218 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
219 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
220 return ss.remote_allocate_buckets(storage_index,
221 renew_secret, cancel_secret,
222 sharenums, size, FakeCanary())
224 def test_dont_overfill_dirs(self):
226 This test asserts that if you add a second share whose storage index
227 share lots of leading bits with an extant share (but isn't the exact
228 same storage index), this won't add an entry to the share directory.
230 ss = self.create("test_dont_overfill_dirs")
231 already, writers = self.allocate(ss, "storageindex", [0], 10)
232 for i, wb in writers.items():
233 wb.remote_write(0, "%10d" % i)
235 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
237 children_of_storedir = set(os.listdir(storedir))
239 # Now store another one under another storageindex that has leading
240 # chars the same as the first storageindex.
241 already, writers = self.allocate(ss, "storageindey", [0], 10)
242 for i, wb in writers.items():
243 wb.remote_write(0, "%10d" % i)
245 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
247 new_children_of_storedir = set(os.listdir(storedir))
248 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
250 def test_remove_incoming(self):
251 ss = self.create("test_remove_incoming")
252 already, writers = self.allocate(ss, "vid", range(3), 10)
253 for i,wb in writers.items():
254 wb.remote_write(0, "%10d" % i)
256 incomingdir = os.path.dirname(os.path.dirname(os.path.dirname(wb.incominghome)))
257 self.failIf(os.path.exists(incomingdir))
259 def test_allocate(self):
260 ss = self.create("test_allocate")
262 self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
264 canary = FakeCanary()
265 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
266 self.failUnlessEqual(already, set())
267 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
269 # while the buckets are open, they should not count as readable
270 self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
273 for i,wb in writers.items():
274 wb.remote_write(0, "%25d" % i)
277 # now they should be readable
278 b = ss.remote_get_buckets("vid")
279 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
280 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
282 # now if we about writing again, the server should offer those three
283 # buckets as already present. It should offer them even if we don't
284 # ask about those specific ones.
285 already,writers = self.allocate(ss, "vid", [2,3,4], 75)
286 self.failUnlessEqual(already, set([0,1,2]))
287 self.failUnlessEqual(set(writers.keys()), set([3,4]))
289 # while those two buckets are open for writing, the server should
290 # refuse to offer them to uploaders
292 already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
293 self.failUnlessEqual(already, set([0,1,2]))
294 self.failUnlessEqual(set(writers.keys()), set([5]))
296 def test_sizelimits(self):
297 ss = self.create("test_sizelimits", 5000)
298 canary = FakeCanary()
299 # a newly created and filled share incurs this much overhead, beyond
300 # the size we request.
302 LEASE_SIZE = 4+32+32+4
304 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
305 self.failUnlessEqual(len(writers), 3)
306 # now the StorageServer should have 3000 bytes provisionally
307 # allocated, allowing only 2000 more to be claimed
308 self.failUnlessEqual(len(ss._active_writers), 3)
310 # allocating 1001-byte shares only leaves room for one
311 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
312 self.failUnlessEqual(len(writers2), 1)
313 self.failUnlessEqual(len(ss._active_writers), 4)
315 # we abandon the first set, so their provisional allocation should be
319 self.failUnlessEqual(len(ss._active_writers), 1)
320 # now we have a provisional allocation of 1001 bytes
322 # and we close the second set, so their provisional allocation should
323 # become real, long-term allocation, and grows to include the
325 for bw in writers2.values():
326 bw.remote_write(0, "a"*25)
331 self.failUnlessEqual(len(ss._active_writers), 0)
333 allocated = 1001 + OVERHEAD + LEASE_SIZE
334 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
335 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
336 already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
337 self.failUnlessEqual(len(writers3), 39)
338 self.failUnlessEqual(len(ss._active_writers), 39)
342 self.failUnlessEqual(len(ss._active_writers), 0)
343 ss.disownServiceParent()
346 # creating a new StorageServer in the same directory should see the
349 # metadata that goes into the share file is counted upon share close,
350 # as well as at startup. metadata that goes into other files will not
351 # be counted until the next startup, so if we were creating any
352 # extra-file metadata, the allocation would be more than 'allocated'
353 # and this test would need to be changed.
354 ss = self.create("test_sizelimits", 5000)
355 already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
356 self.failUnlessEqual(len(writers4), 39)
357 self.failUnlessEqual(len(ss._active_writers), 39)
360 basedir = self.workdir("test_seek_behavior")
361 fileutil.make_dirs(basedir)
362 filename = os.path.join(basedir, "testfile")
363 f = open(filename, "wb")
366 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
367 # files. mode="a" preserves previous contents but does not allow
368 # seeking-to-create-holes. mode="r+" allows both.
369 f = open(filename, "rb+")
373 filelen = os.stat(filename)[stat.ST_SIZE]
374 self.failUnlessEqual(filelen, 100+3)
375 f2 = open(filename, "rb")
376 self.failUnlessEqual(f2.read(5), "start")
379 def test_leases(self):
380 ss = self.create("test_leases")
381 canary = FakeCanary()
385 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
386 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
387 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
388 sharenums, size, canary)
389 self.failUnlessEqual(len(already), 0)
390 self.failUnlessEqual(len(writers), 5)
391 for wb in writers.values():
394 leases = list(ss.get_leases("si0"))
395 self.failUnlessEqual(len(leases), 1)
396 self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
398 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
399 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
400 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
401 sharenums, size, canary)
402 for wb in writers.values():
405 # take out a second lease on si1
406 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
407 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
408 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
409 sharenums, size, canary)
410 self.failUnlessEqual(len(already), 5)
411 self.failUnlessEqual(len(writers), 0)
413 leases = list(ss.get_leases("si1"))
414 self.failUnlessEqual(len(leases), 2)
415 self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
417 # check that si0 is readable
418 readers = ss.remote_get_buckets("si0")
419 self.failUnlessEqual(len(readers), 5)
421 # renew the first lease. Only the proper renew_secret should work
422 ss.remote_renew_lease("si0", rs0)
423 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
424 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
426 # check that si0 is still readable
427 readers = ss.remote_get_buckets("si0")
428 self.failUnlessEqual(len(readers), 5)
431 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
432 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
433 ss.remote_cancel_lease("si0", cs0)
435 # si0 should now be gone
436 readers = ss.remote_get_buckets("si0")
437 self.failUnlessEqual(len(readers), 0)
438 # and the renew should no longer work
439 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
442 # cancel the first lease on si1, leaving the second in place
443 ss.remote_cancel_lease("si1", cs1)
444 readers = ss.remote_get_buckets("si1")
445 self.failUnlessEqual(len(readers), 5)
446 # the corresponding renew should no longer work
447 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
449 leases = list(ss.get_leases("si1"))
450 self.failUnlessEqual(len(leases), 1)
451 self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
453 ss.remote_renew_lease("si1", rs2)
454 # cancelling the second should make it go away
455 ss.remote_cancel_lease("si1", cs2)
456 readers = ss.remote_get_buckets("si1")
457 self.failUnlessEqual(len(readers), 0)
458 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
459 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
461 leases = list(ss.get_leases("si1"))
462 self.failUnlessEqual(len(leases), 0)
465 # test overlapping uploads
466 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
467 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
468 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
469 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
470 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
471 sharenums, size, canary)
472 self.failUnlessEqual(len(already), 0)
473 self.failUnlessEqual(len(writers), 5)
474 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
475 sharenums, size, canary)
476 self.failUnlessEqual(len(already2), 0)
477 self.failUnlessEqual(len(writers2), 0)
478 for wb in writers.values():
481 leases = list(ss.get_leases("si3"))
482 self.failUnlessEqual(len(leases), 1)
484 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
485 sharenums, size, canary)
486 self.failUnlessEqual(len(already3), 5)
487 self.failUnlessEqual(len(writers3), 0)
489 leases = list(ss.get_leases("si3"))
490 self.failUnlessEqual(len(leases), 2)
494 class MutableServer(unittest.TestCase):
497 self.sparent = LoggingServiceParent()
498 self._lease_secret = itertools.count()
500 return self.sparent.stopService()
502 def workdir(self, name):
503 basedir = os.path.join("storage", "MutableServer", name)
506 def create(self, name, sizelimit=None):
507 workdir = self.workdir(name)
508 ss = StorageServer(workdir, sizelimit)
509 ss.setServiceParent(self.sparent)
510 ss.setNodeID("\x00" * 32)
513 def test_create(self):
514 ss = self.create("test_create")
516 def write_enabler(self, we_tag):
517 return hashutil.tagged_hash("we_blah", we_tag)
519 def renew_secret(self, tag):
520 return hashutil.tagged_hash("renew_blah", str(tag))
522 def cancel_secret(self, tag):
523 return hashutil.tagged_hash("cancel_blah", str(tag))
525 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
526 write_enabler = self.write_enabler(we_tag)
527 renew_secret = self.renew_secret(lease_tag)
528 cancel_secret = self.cancel_secret(lease_tag)
529 rstaraw = ss.remote_slot_testv_and_readv_and_writev
530 testandwritev = dict( [ (shnum, ([], [], None) )
531 for shnum in sharenums ] )
533 rc = rstaraw(storage_index,
534 (write_enabler, renew_secret, cancel_secret),
537 (did_write, readv_data) = rc
538 self.failUnless(did_write)
539 self.failUnless(isinstance(readv_data, dict))
540 self.failUnlessEqual(len(readv_data), 0)
542 def test_allocate(self):
543 ss = self.create("test_allocate")
544 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
547 read = ss.remote_slot_readv
548 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
550 self.failUnlessEqual(read("si1", [], [(0, 10)]),
551 {0: [""], 1: [""], 2: [""]})
552 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
556 secrets = ( self.write_enabler("we1"),
557 self.renew_secret("we1"),
558 self.cancel_secret("we1") )
559 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
560 write = ss.remote_slot_testv_and_readv_and_writev
561 answer = write("si1", secrets,
562 {0: ([], [(0,data)], None)},
564 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
566 self.failUnlessEqual(read("si1", [0], [(0,20)]),
567 {0: ["00000000001111111111"]})
568 self.failUnlessEqual(read("si1", [0], [(95,10)]),
570 #self.failUnlessEqual(s0.remote_get_length(), 100)
572 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
573 f = self.failUnlessRaises(BadWriteEnablerError,
574 write, "si1", bad_secrets,
576 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
578 # this testv should fail
579 answer = write("si1", secrets,
580 {0: ([(0, 12, "eq", "444444444444"),
581 (20, 5, "eq", "22222"),
588 self.failUnlessEqual(answer, (False,
589 {0: ["000000000011", "22222"],
593 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
596 answer = write("si1", secrets,
597 {0: ([(10, 5, "lt", "11111"),
604 self.failUnlessEqual(answer, (False,
609 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
612 def test_operators(self):
613 # test operators, the data we're comparing is '11111' in all cases.
614 # test both fail+pass, reset data after each one.
615 ss = self.create("test_operators")
617 secrets = ( self.write_enabler("we1"),
618 self.renew_secret("we1"),
619 self.cancel_secret("we1") )
620 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
621 write = ss.remote_slot_testv_and_readv_and_writev
622 read = ss.remote_slot_readv
625 write("si1", secrets,
626 {0: ([], [(0,data)], None)},
632 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
637 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
638 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
639 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
642 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
647 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
648 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
651 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
656 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
657 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
661 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
666 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
667 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
670 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
675 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
676 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
679 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
684 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
685 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
689 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
694 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
695 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
698 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
703 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
704 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
708 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
713 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
714 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
717 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
722 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
723 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
727 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
732 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
733 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
736 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
741 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
742 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
745 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
750 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
751 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
755 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
760 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
761 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
764 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
769 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
770 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
773 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
778 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
779 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
782 def test_readv(self):
783 ss = self.create("test_readv")
784 secrets = ( self.write_enabler("we1"),
785 self.renew_secret("we1"),
786 self.cancel_secret("we1") )
787 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
788 write = ss.remote_slot_testv_and_readv_and_writev
789 read = ss.remote_slot_readv
790 data = [("%d" % i) * 100 for i in range(3)]
791 rc = write("si1", secrets,
792 {0: ([], [(0,data[0])], None),
793 1: ([], [(0,data[1])], None),
794 2: ([], [(0,data[2])], None),
796 self.failUnlessEqual(rc, (True, {}))
798 answer = read("si1", [], [(0, 10)])
799 self.failUnlessEqual(answer, {0: ["0"*10],
803 def compare_leases_without_timestamps(self, a, b):
804 self.failUnlessEqual(len(a), len(b))
805 for i in range(len(a)):
806 (num_a, (ownerid_a, expiration_time_a,
807 renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
808 (num_b, (ownerid_b, expiration_time_b,
809 renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
810 self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
811 cancel_secret_a, nodeid_a),
812 (num_b, ownerid_b, renew_secret_b,
813 cancel_secret_b, nodeid_b) )
815 def test_leases(self):
816 ss = self.create("test_leases")
818 return ( self.write_enabler("we1"),
819 self.renew_secret("we1-%d" % n),
820 self.cancel_secret("we1-%d" % n) )
821 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
822 write = ss.remote_slot_testv_and_readv_and_writev
823 read = ss.remote_slot_readv
824 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
825 self.failUnlessEqual(rc, (True, {}))
827 # create a random non-numeric file in the bucket directory, to
828 # exercise the code that's supposed to ignore those.
829 bucket_dir = os.path.join(self.workdir("test_leases"),
830 "shares", storage_index_to_dir("si1"))
831 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
832 f.write("you ought to be ignoring me\n")
835 # re-allocate the slots and use the same secrets, that should update
837 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
840 ss.remote_renew_lease("si1", secrets(0)[1])
842 # now allocate them with a bunch of different secrets, to trigger the
843 # extended lease code
844 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
845 write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
846 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
847 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
848 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
851 ss.remote_cancel_lease("si1", secrets(5)[2])
853 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
854 all_leases = s0.debug_get_leases()
855 self.failUnlessEqual(len(all_leases), 5)
857 # and write enough data to expand the container, forcing the server
859 write("si1", secrets(0),
860 {0: ([], [(0,data)], 200), },
863 # read back the leases, make sure they're still intact.
864 self.compare_leases_without_timestamps(all_leases,
865 s0.debug_get_leases())
867 ss.remote_renew_lease("si1", secrets(0)[1])
868 ss.remote_renew_lease("si1", secrets(1)[1])
869 ss.remote_renew_lease("si1", secrets(2)[1])
870 ss.remote_renew_lease("si1", secrets(3)[1])
871 ss.remote_renew_lease("si1", secrets(4)[1])
872 self.compare_leases_without_timestamps(all_leases,
873 s0.debug_get_leases())
874 # get a new copy of the leases, with the current timestamps. Reading
875 # data and failing to renew/cancel leases should leave the timestamps
877 all_leases = s0.debug_get_leases()
878 # renewing with a bogus token should prompt an error message
880 # TODO: examine the exception thus raised, make sure the old nodeid
881 # is present, to provide for share migration
882 self.failUnlessRaises(IndexError,
883 ss.remote_renew_lease, "si1",
885 # same for cancelling
886 self.failUnlessRaises(IndexError,
887 ss.remote_cancel_lease, "si1",
889 self.failUnlessEqual(all_leases, s0.debug_get_leases())
891 # reading shares should not modify the timestamp
892 read("si1", [], [(0,200)])
893 self.failUnlessEqual(all_leases, s0.debug_get_leases())
895 write("si1", secrets(0),
896 {0: ([], [(200, "make me bigger")], None)}, [])
897 self.compare_leases_without_timestamps(all_leases,
898 s0.debug_get_leases())
900 write("si1", secrets(0),
901 {0: ([], [(500, "make me really bigger")], None)}, [])
902 self.compare_leases_without_timestamps(all_leases,
903 s0.debug_get_leases())
905 # now cancel them all
906 ss.remote_cancel_lease("si1", secrets(0)[2])
907 ss.remote_cancel_lease("si1", secrets(1)[2])
908 ss.remote_cancel_lease("si1", secrets(2)[2])
909 ss.remote_cancel_lease("si1", secrets(3)[2])
911 # the slot should still be there
912 remaining_shares = read("si1", [], [(0,10)])
913 self.failUnlessEqual(len(remaining_shares), 1)
914 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
916 ss.remote_cancel_lease("si1", secrets(4)[2])
917 # now the slot should be gone
918 no_shares = read("si1", [], [(0,10)])
919 self.failUnlessEqual(no_shares, {})
921 class Stats(unittest.TestCase):
924 self.sparent = LoggingServiceParent()
925 self._lease_secret = itertools.count()
927 return self.sparent.stopService()
929 def workdir(self, name):
930 basedir = os.path.join("storage", "Server", name)
933 def create(self, name, sizelimit=None):
934 workdir = self.workdir(name)
935 ss = StorageServer(workdir, sizelimit)
936 ss.setServiceParent(self.sparent)
939 def test_latencies(self):
940 ss = self.create("test_latencies")
941 for i in range(10000):
942 ss.add_latency("allocate", 1.0 * i)
943 for i in range(1000):
944 ss.add_latency("renew", 1.0 * i)
946 ss.add_latency("cancel", 2.0 * i)
947 ss.add_latency("get", 5.0)
949 output = ss.get_latencies()
951 self.failUnlessEqual(sorted(output.keys()),
952 sorted(["allocate", "renew", "cancel", "get"]))
953 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
954 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
955 self.failUnless(abs(output["allocate"]["median"] - 9500) < 1)
956 self.failUnless(abs(output["allocate"]["90_percentile"] - 9900) < 1)
957 self.failUnless(abs(output["allocate"]["95_percentile"] - 9950) < 1)
958 self.failUnless(abs(output["allocate"]["99_percentile"] - 9990) < 1)
959 self.failUnless(abs(output["allocate"]["999_percentile"] - 9999) < 1)
961 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
962 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
963 self.failUnless(abs(output["renew"]["median"] - 500) < 1)
964 self.failUnless(abs(output["renew"]["90_percentile"] - 900) < 1)
965 self.failUnless(abs(output["renew"]["95_percentile"] - 950) < 1)
966 self.failUnless(abs(output["renew"]["99_percentile"] - 990) < 1)
967 self.failUnless(abs(output["renew"]["999_percentile"] - 999) < 1)
969 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
970 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
971 self.failUnless(abs(output["cancel"]["median"] - 10) < 1)
972 self.failUnless(abs(output["cancel"]["90_percentile"] - 18) < 1)
973 self.failUnless(abs(output["cancel"]["95_percentile"] - 18) < 1)
974 self.failUnless(abs(output["cancel"]["99_percentile"] - 18) < 1)
975 self.failUnless(abs(output["cancel"]["999_percentile"] - 18) < 1)
977 self.failUnlessEqual(len(ss.latencies["get"]), 1)
978 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
979 self.failUnless(abs(output["get"]["median"] - 5) < 1)
980 self.failUnless(abs(output["get"]["90_percentile"] - 5) < 1)
981 self.failUnless(abs(output["get"]["95_percentile"] - 5) < 1)
982 self.failUnless(abs(output["get"]["99_percentile"] - 5) < 1)
983 self.failUnless(abs(output["get"]["999_percentile"] - 5) < 1)