2 from twisted.trial import unittest
4 from twisted.internet import defer
5 import time, os.path, stat
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil
9 from allmydata.storage import BucketWriter, BucketReader, \
10 WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
12 from allmydata.interfaces import BadWriteEnablerError
13 from allmydata.test.common import LoggingServiceParent
16 def notifyOnDisconnect(self, *args, **kwargs):
18 def dontNotifyOnDisconnect(self, marker):
21 class Bucket(unittest.TestCase):
22 def make_workdir(self, name):
23 basedir = os.path.join("storage", "Bucket", name)
24 incoming = os.path.join(basedir, "tmp", "bucket")
25 final = os.path.join(basedir, "bucket")
26 fileutil.make_dirs(basedir)
27 fileutil.make_dirs(os.path.join(basedir, "tmp"))
28 return incoming, final
30 def bucket_writer_closed(self, bw, consumed):
32 def add_latency(self, category, latency):
37 renew_secret = os.urandom(32)
38 cancel_secret = os.urandom(32)
39 expiration_time = time.time() + 5000
40 return (owner_num, renew_secret, cancel_secret, expiration_time)
42 def test_create(self):
43 incoming, final = self.make_workdir("test_create")
44 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
46 bw.remote_write(0, "a"*25)
47 bw.remote_write(25, "b"*25)
48 bw.remote_write(50, "c"*25)
49 bw.remote_write(75, "d"*7)
52 def test_readwrite(self):
53 incoming, final = self.make_workdir("test_readwrite")
54 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
56 bw.remote_write(0, "a"*25)
57 bw.remote_write(25, "b"*25)
58 bw.remote_write(50, "c"*7) # last block may be short
62 br = BucketReader(self, bw.finalhome)
63 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
64 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
65 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
69 def callRemote(self, methname, *args, **kwargs):
71 meth = getattr(self.target, "remote_" + methname)
72 return meth(*args, **kwargs)
73 return defer.maybeDeferred(_call)
75 class BucketProxy(unittest.TestCase):
76 def make_bucket(self, name, size):
77 basedir = os.path.join("storage", "BucketProxy", name)
78 incoming = os.path.join(basedir, "tmp", "bucket")
79 final = os.path.join(basedir, "bucket")
80 fileutil.make_dirs(basedir)
81 fileutil.make_dirs(os.path.join(basedir, "tmp"))
82 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
90 renew_secret = os.urandom(32)
91 cancel_secret = os.urandom(32)
92 expiration_time = time.time() + 5000
93 return (owner_num, renew_secret, cancel_secret, expiration_time)
95 def bucket_writer_closed(self, bw, consumed):
97 def add_latency(self, category, latency):
100 def test_create(self):
101 bw, rb, sharefname = self.make_bucket("test_create", 500)
102 bp = WriteBucketProxy(rb,
107 uri_extension_size=500, nodeid=None)
108 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
110 def test_readwrite(self):
111 # Let's pretend each share has 100 bytes of data, and that there are
112 # 4 segments (25 bytes each), and 8 shares total. So the three
113 # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
114 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
115 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
116 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
117 # long. That should make the whole share:
119 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
121 plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
123 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
125 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
127 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
129 uri_extension = "s" + "E"*498 + "e"
131 bw, rb, sharefname = self.make_bucket("test_readwrite", 1414)
132 bp = WriteBucketProxy(rb,
137 uri_extension_size=len(uri_extension),
141 d.addCallback(lambda res: bp.put_block(0, "a"*25))
142 d.addCallback(lambda res: bp.put_block(1, "b"*25))
143 d.addCallback(lambda res: bp.put_block(2, "c"*25))
144 d.addCallback(lambda res: bp.put_block(3, "d"*20))
145 d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
146 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
147 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
148 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
149 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
150 d.addCallback(lambda res: bp.close())
152 # now read everything back
153 def _start_reading(res):
154 br = BucketReader(self, sharefname)
157 rbp = ReadBucketProxy(rb)
158 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
160 d1 = rbp.startIfNecessary()
161 d1.addCallback(lambda res: rbp.get_block(0))
162 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
163 d1.addCallback(lambda res: rbp.get_block(1))
164 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
165 d1.addCallback(lambda res: rbp.get_block(2))
166 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
167 d1.addCallback(lambda res: rbp.get_block(3))
168 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
170 d1.addCallback(lambda res: rbp.get_plaintext_hashes())
171 d1.addCallback(lambda res:
172 self.failUnlessEqual(res, plaintext_hashes))
173 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
174 d1.addCallback(lambda res:
175 self.failUnlessEqual(res, crypttext_hashes))
176 d1.addCallback(lambda res: rbp.get_block_hashes())
177 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
178 d1.addCallback(lambda res: rbp.get_share_hashes())
179 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
180 d1.addCallback(lambda res: rbp.get_uri_extension())
181 d1.addCallback(lambda res:
182 self.failUnlessEqual(res, uri_extension))
186 d.addCallback(_start_reading)
192 class Server(unittest.TestCase):
195 self.sparent = LoggingServiceParent()
196 self._lease_secret = itertools.count()
198 return self.sparent.stopService()
200 def workdir(self, name):
201 basedir = os.path.join("storage", "Server", name)
204 def create(self, name, sizelimit=None):
205 workdir = self.workdir(name)
206 ss = StorageServer(workdir, sizelimit)
207 ss.setServiceParent(self.sparent)
210 def test_create(self):
211 ss = self.create("test_create")
213 def allocate(self, ss, storage_index, sharenums, size):
214 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
215 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
216 return ss.remote_allocate_buckets(storage_index,
217 renew_secret, cancel_secret,
218 sharenums, size, FakeCanary())
220 def test_dont_overfill_dirs(self):
222 This test asserts that if you add a second share whose storage index
223 share lots of leading bits with an extant share (but isn't the exact
224 same storage index), this won't add an entry to the share directory.
226 ss = self.create("test_dont_overfill_dirs")
227 already, writers = self.allocate(ss, "storageindex", [0], 10)
228 for i, wb in writers.items():
229 wb.remote_write(0, "%10d" % i)
231 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
233 children_of_storedir = set(os.listdir(storedir))
235 # Now store another one under another storageindex that has leading
236 # chars the same as the first storageindex.
237 already, writers = self.allocate(ss, "storageindey", [0], 10)
238 for i, wb in writers.items():
239 wb.remote_write(0, "%10d" % i)
241 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
243 new_children_of_storedir = set(os.listdir(storedir))
244 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
246 def test_remove_incoming(self):
247 ss = self.create("test_remove_incoming")
248 already, writers = self.allocate(ss, "vid", range(3), 10)
249 for i,wb in writers.items():
250 wb.remote_write(0, "%10d" % i)
252 incomingdir = os.path.dirname(os.path.dirname(os.path.dirname(wb.incominghome)))
253 self.failIf(os.path.exists(incomingdir))
255 def test_allocate(self):
256 ss = self.create("test_allocate")
258 self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
260 canary = FakeCanary()
261 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
262 self.failUnlessEqual(already, set())
263 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
265 # while the buckets are open, they should not count as readable
266 self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
269 for i,wb in writers.items():
270 wb.remote_write(0, "%25d" % i)
273 # now they should be readable
274 b = ss.remote_get_buckets("vid")
275 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
276 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
278 # now if we about writing again, the server should offer those three
279 # buckets as already present. It should offer them even if we don't
280 # ask about those specific ones.
281 already,writers = self.allocate(ss, "vid", [2,3,4], 75)
282 self.failUnlessEqual(already, set([0,1,2]))
283 self.failUnlessEqual(set(writers.keys()), set([3,4]))
285 # while those two buckets are open for writing, the server should
286 # refuse to offer them to uploaders
288 already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
289 self.failUnlessEqual(already, set([0,1,2]))
290 self.failUnlessEqual(set(writers.keys()), set([5]))
292 def test_sizelimits(self):
293 ss = self.create("test_sizelimits", 5000)
294 canary = FakeCanary()
295 # a newly created and filled share incurs this much overhead, beyond
296 # the size we request.
298 LEASE_SIZE = 4+32+32+4
300 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
301 self.failUnlessEqual(len(writers), 3)
302 # now the StorageServer should have 3000 bytes provisionally
303 # allocated, allowing only 2000 more to be claimed
304 self.failUnlessEqual(len(ss._active_writers), 3)
306 # allocating 1001-byte shares only leaves room for one
307 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
308 self.failUnlessEqual(len(writers2), 1)
309 self.failUnlessEqual(len(ss._active_writers), 4)
311 # we abandon the first set, so their provisional allocation should be
315 self.failUnlessEqual(len(ss._active_writers), 1)
316 # now we have a provisional allocation of 1001 bytes
318 # and we close the second set, so their provisional allocation should
319 # become real, long-term allocation, and grows to include the
321 for bw in writers2.values():
322 bw.remote_write(0, "a"*25)
327 self.failUnlessEqual(len(ss._active_writers), 0)
329 allocated = 1001 + OVERHEAD + LEASE_SIZE
330 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
331 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
332 already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
333 self.failUnlessEqual(len(writers3), 39)
334 self.failUnlessEqual(len(ss._active_writers), 39)
338 self.failUnlessEqual(len(ss._active_writers), 0)
339 ss.disownServiceParent()
342 # creating a new StorageServer in the same directory should see the
345 # metadata that goes into the share file is counted upon share close,
346 # as well as at startup. metadata that goes into other files will not
347 # be counted until the next startup, so if we were creating any
348 # extra-file metadata, the allocation would be more than 'allocated'
349 # and this test would need to be changed.
350 ss = self.create("test_sizelimits", 5000)
351 already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
352 self.failUnlessEqual(len(writers4), 39)
353 self.failUnlessEqual(len(ss._active_writers), 39)
356 basedir = self.workdir("test_seek_behavior")
357 fileutil.make_dirs(basedir)
358 filename = os.path.join(basedir, "testfile")
359 f = open(filename, "wb")
362 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
363 # files. mode="a" preserves previous contents but does not allow
364 # seeking-to-create-holes. mode="r+" allows both.
365 f = open(filename, "rb+")
369 filelen = os.stat(filename)[stat.ST_SIZE]
370 self.failUnlessEqual(filelen, 100+3)
371 f2 = open(filename, "rb")
372 self.failUnlessEqual(f2.read(5), "start")
375 def test_leases(self):
376 ss = self.create("test_leases")
377 canary = FakeCanary()
381 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
382 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
383 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
384 sharenums, size, canary)
385 self.failUnlessEqual(len(already), 0)
386 self.failUnlessEqual(len(writers), 5)
387 for wb in writers.values():
390 leases = list(ss.get_leases("si0"))
391 self.failUnlessEqual(len(leases), 1)
392 self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
394 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
395 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
396 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
397 sharenums, size, canary)
398 for wb in writers.values():
401 # take out a second lease on si1
402 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
403 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
404 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
405 sharenums, size, canary)
406 self.failUnlessEqual(len(already), 5)
407 self.failUnlessEqual(len(writers), 0)
409 leases = list(ss.get_leases("si1"))
410 self.failUnlessEqual(len(leases), 2)
411 self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
413 # check that si0 is readable
414 readers = ss.remote_get_buckets("si0")
415 self.failUnlessEqual(len(readers), 5)
417 # renew the first lease. Only the proper renew_secret should work
418 ss.remote_renew_lease("si0", rs0)
419 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
420 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
422 # check that si0 is still readable
423 readers = ss.remote_get_buckets("si0")
424 self.failUnlessEqual(len(readers), 5)
427 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
428 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
429 ss.remote_cancel_lease("si0", cs0)
431 # si0 should now be gone
432 readers = ss.remote_get_buckets("si0")
433 self.failUnlessEqual(len(readers), 0)
434 # and the renew should no longer work
435 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
438 # cancel the first lease on si1, leaving the second in place
439 ss.remote_cancel_lease("si1", cs1)
440 readers = ss.remote_get_buckets("si1")
441 self.failUnlessEqual(len(readers), 5)
442 # the corresponding renew should no longer work
443 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
445 leases = list(ss.get_leases("si1"))
446 self.failUnlessEqual(len(leases), 1)
447 self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
449 ss.remote_renew_lease("si1", rs2)
450 # cancelling the second should make it go away
451 ss.remote_cancel_lease("si1", cs2)
452 readers = ss.remote_get_buckets("si1")
453 self.failUnlessEqual(len(readers), 0)
454 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
455 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
457 leases = list(ss.get_leases("si1"))
458 self.failUnlessEqual(len(leases), 0)
461 # test overlapping uploads
462 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
463 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
464 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
465 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
466 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
467 sharenums, size, canary)
468 self.failUnlessEqual(len(already), 0)
469 self.failUnlessEqual(len(writers), 5)
470 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
471 sharenums, size, canary)
472 self.failUnlessEqual(len(already2), 0)
473 self.failUnlessEqual(len(writers2), 0)
474 for wb in writers.values():
477 leases = list(ss.get_leases("si3"))
478 self.failUnlessEqual(len(leases), 1)
480 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
481 sharenums, size, canary)
482 self.failUnlessEqual(len(already3), 5)
483 self.failUnlessEqual(len(writers3), 0)
485 leases = list(ss.get_leases("si3"))
486 self.failUnlessEqual(len(leases), 2)
490 class MutableServer(unittest.TestCase):
493 self.sparent = LoggingServiceParent()
494 self._lease_secret = itertools.count()
496 return self.sparent.stopService()
498 def workdir(self, name):
499 basedir = os.path.join("storage", "MutableServer", name)
502 def create(self, name, sizelimit=None):
503 workdir = self.workdir(name)
504 ss = StorageServer(workdir, sizelimit)
505 ss.setServiceParent(self.sparent)
506 ss.setNodeID("\x00" * 32)
509 def test_create(self):
510 ss = self.create("test_create")
512 def write_enabler(self, we_tag):
513 return hashutil.tagged_hash("we_blah", we_tag)
515 def renew_secret(self, tag):
516 return hashutil.tagged_hash("renew_blah", str(tag))
518 def cancel_secret(self, tag):
519 return hashutil.tagged_hash("cancel_blah", str(tag))
521 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
522 write_enabler = self.write_enabler(we_tag)
523 renew_secret = self.renew_secret(lease_tag)
524 cancel_secret = self.cancel_secret(lease_tag)
525 rstaraw = ss.remote_slot_testv_and_readv_and_writev
526 testandwritev = dict( [ (shnum, ([], [], None) )
527 for shnum in sharenums ] )
529 rc = rstaraw(storage_index,
530 (write_enabler, renew_secret, cancel_secret),
533 (did_write, readv_data) = rc
534 self.failUnless(did_write)
535 self.failUnless(isinstance(readv_data, dict))
536 self.failUnlessEqual(len(readv_data), 0)
538 def test_allocate(self):
539 ss = self.create("test_allocate")
540 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
543 read = ss.remote_slot_readv
544 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
546 self.failUnlessEqual(read("si1", [], [(0, 10)]),
547 {0: [""], 1: [""], 2: [""]})
548 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
552 secrets = ( self.write_enabler("we1"),
553 self.renew_secret("we1"),
554 self.cancel_secret("we1") )
555 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
556 write = ss.remote_slot_testv_and_readv_and_writev
557 answer = write("si1", secrets,
558 {0: ([], [(0,data)], None)},
560 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
562 self.failUnlessEqual(read("si1", [0], [(0,20)]),
563 {0: ["00000000001111111111"]})
564 self.failUnlessEqual(read("si1", [0], [(95,10)]),
566 #self.failUnlessEqual(s0.remote_get_length(), 100)
568 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
569 f = self.failUnlessRaises(BadWriteEnablerError,
570 write, "si1", bad_secrets,
572 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
574 # this testv should fail
575 answer = write("si1", secrets,
576 {0: ([(0, 12, "eq", "444444444444"),
577 (20, 5, "eq", "22222"),
584 self.failUnlessEqual(answer, (False,
585 {0: ["000000000011", "22222"],
589 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
592 answer = write("si1", secrets,
593 {0: ([(10, 5, "lt", "11111"),
600 self.failUnlessEqual(answer, (False,
605 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
608 def test_operators(self):
609 # test operators, the data we're comparing is '11111' in all cases.
610 # test both fail+pass, reset data after each one.
611 ss = self.create("test_operators")
613 secrets = ( self.write_enabler("we1"),
614 self.renew_secret("we1"),
615 self.cancel_secret("we1") )
616 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
617 write = ss.remote_slot_testv_and_readv_and_writev
618 read = ss.remote_slot_readv
621 write("si1", secrets,
622 {0: ([], [(0,data)], None)},
628 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
633 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
634 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
635 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
638 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
643 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
644 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
647 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
652 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
653 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
657 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
662 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
663 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
666 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
671 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
672 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
675 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
680 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
681 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
685 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
690 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
691 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
694 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
699 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
700 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
704 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
709 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
710 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
713 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
718 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
719 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
723 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
728 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
729 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
732 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
737 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
738 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
741 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
746 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
747 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
751 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
756 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
757 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
760 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
765 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
766 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
769 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
774 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
775 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
778 def test_readv(self):
779 ss = self.create("test_readv")
780 secrets = ( self.write_enabler("we1"),
781 self.renew_secret("we1"),
782 self.cancel_secret("we1") )
783 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
784 write = ss.remote_slot_testv_and_readv_and_writev
785 read = ss.remote_slot_readv
786 data = [("%d" % i) * 100 for i in range(3)]
787 rc = write("si1", secrets,
788 {0: ([], [(0,data[0])], None),
789 1: ([], [(0,data[1])], None),
790 2: ([], [(0,data[2])], None),
792 self.failUnlessEqual(rc, (True, {}))
794 answer = read("si1", [], [(0, 10)])
795 self.failUnlessEqual(answer, {0: ["0"*10],
799 def compare_leases_without_timestamps(self, a, b):
800 self.failUnlessEqual(len(a), len(b))
801 for i in range(len(a)):
802 (num_a, (ownerid_a, expiration_time_a,
803 renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
804 (num_b, (ownerid_b, expiration_time_b,
805 renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
806 self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
807 cancel_secret_a, nodeid_a),
808 (num_b, ownerid_b, renew_secret_b,
809 cancel_secret_b, nodeid_b) )
811 def test_leases(self):
812 ss = self.create("test_leases")
814 return ( self.write_enabler("we1"),
815 self.renew_secret("we1-%d" % n),
816 self.cancel_secret("we1-%d" % n) )
817 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
818 write = ss.remote_slot_testv_and_readv_and_writev
819 read = ss.remote_slot_readv
820 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
821 self.failUnlessEqual(rc, (True, {}))
823 # create a random non-numeric file in the bucket directory, to
824 # exercise the code that's supposed to ignore those.
825 bucket_dir = os.path.join(self.workdir("test_leases"),
826 "shares", storage_index_to_dir("si1"))
827 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
828 f.write("you ought to be ignoring me\n")
831 # re-allocate the slots and use the same secrets, that should update
833 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
836 ss.remote_renew_lease("si1", secrets(0)[1])
838 # now allocate them with a bunch of different secrets, to trigger the
839 # extended lease code
840 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
841 write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
842 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
843 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
844 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
847 ss.remote_cancel_lease("si1", secrets(5)[2])
849 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
850 all_leases = s0.debug_get_leases()
851 self.failUnlessEqual(len(all_leases), 5)
853 # and write enough data to expand the container, forcing the server
855 write("si1", secrets(0),
856 {0: ([], [(0,data)], 200), },
859 # read back the leases, make sure they're still intact.
860 self.compare_leases_without_timestamps(all_leases,
861 s0.debug_get_leases())
863 ss.remote_renew_lease("si1", secrets(0)[1])
864 ss.remote_renew_lease("si1", secrets(1)[1])
865 ss.remote_renew_lease("si1", secrets(2)[1])
866 ss.remote_renew_lease("si1", secrets(3)[1])
867 ss.remote_renew_lease("si1", secrets(4)[1])
868 self.compare_leases_without_timestamps(all_leases,
869 s0.debug_get_leases())
870 # get a new copy of the leases, with the current timestamps. Reading
871 # data and failing to renew/cancel leases should leave the timestamps
873 all_leases = s0.debug_get_leases()
874 # renewing with a bogus token should prompt an error message
876 # TODO: examine the exception thus raised, make sure the old nodeid
877 # is present, to provide for share migration
878 self.failUnlessRaises(IndexError,
879 ss.remote_renew_lease, "si1",
881 # same for cancelling
882 self.failUnlessRaises(IndexError,
883 ss.remote_cancel_lease, "si1",
885 self.failUnlessEqual(all_leases, s0.debug_get_leases())
887 # reading shares should not modify the timestamp
888 read("si1", [], [(0,200)])
889 self.failUnlessEqual(all_leases, s0.debug_get_leases())
891 write("si1", secrets(0),
892 {0: ([], [(200, "make me bigger")], None)}, [])
893 self.compare_leases_without_timestamps(all_leases,
894 s0.debug_get_leases())
896 write("si1", secrets(0),
897 {0: ([], [(500, "make me really bigger")], None)}, [])
898 self.compare_leases_without_timestamps(all_leases,
899 s0.debug_get_leases())
901 # now cancel them all
902 ss.remote_cancel_lease("si1", secrets(0)[2])
903 ss.remote_cancel_lease("si1", secrets(1)[2])
904 ss.remote_cancel_lease("si1", secrets(2)[2])
905 ss.remote_cancel_lease("si1", secrets(3)[2])
907 # the slot should still be there
908 remaining_shares = read("si1", [], [(0,10)])
909 self.failUnlessEqual(len(remaining_shares), 1)
910 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
912 ss.remote_cancel_lease("si1", secrets(4)[2])
913 # now the slot should be gone
914 no_shares = read("si1", [], [(0,10)])
915 self.failUnlessEqual(no_shares, {})
917 class Stats(unittest.TestCase):
920 self.sparent = LoggingServiceParent()
921 self._lease_secret = itertools.count()
923 return self.sparent.stopService()
925 def workdir(self, name):
926 basedir = os.path.join("storage", "Server", name)
929 def create(self, name, sizelimit=None):
930 workdir = self.workdir(name)
931 ss = StorageServer(workdir, sizelimit)
932 ss.setServiceParent(self.sparent)
935 def test_latencies(self):
936 ss = self.create("test_latencies")
937 for i in range(10000):
938 ss.add_latency("allocate", 1.0 * i)
939 for i in range(1000):
940 ss.add_latency("renew", 1.0 * i)
942 ss.add_latency("cancel", 2.0 * i)
943 ss.add_latency("get", 5.0)
945 output = ss.get_latencies()
947 self.failUnlessEqual(sorted(output.keys()),
948 sorted(["allocate", "renew", "cancel", "get"]))
949 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
950 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
951 self.failUnless(abs(output["allocate"]["median"] - 9500) < 1)
952 self.failUnless(abs(output["allocate"]["90_percentile"] - 9900) < 1)
953 self.failUnless(abs(output["allocate"]["95_percentile"] - 9950) < 1)
954 self.failUnless(abs(output["allocate"]["99_percentile"] - 9990) < 1)
955 self.failUnless(abs(output["allocate"]["999_percentile"] - 9999) < 1)
957 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
958 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
959 self.failUnless(abs(output["renew"]["median"] - 500) < 1)
960 self.failUnless(abs(output["renew"]["90_percentile"] - 900) < 1)
961 self.failUnless(abs(output["renew"]["95_percentile"] - 950) < 1)
962 self.failUnless(abs(output["renew"]["99_percentile"] - 990) < 1)
963 self.failUnless(abs(output["renew"]["999_percentile"] - 999) < 1)
965 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
966 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
967 self.failUnless(abs(output["cancel"]["median"] - 10) < 1)
968 self.failUnless(abs(output["cancel"]["90_percentile"] - 18) < 1)
969 self.failUnless(abs(output["cancel"]["95_percentile"] - 18) < 1)
970 self.failUnless(abs(output["cancel"]["99_percentile"] - 18) < 1)
971 self.failUnless(abs(output["cancel"]["999_percentile"] - 18) < 1)
973 self.failUnlessEqual(len(ss.latencies["get"]), 1)
974 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
975 self.failUnless(abs(output["get"]["median"] - 5) < 1)
976 self.failUnless(abs(output["get"]["90_percentile"] - 5) < 1)
977 self.failUnless(abs(output["get"]["95_percentile"] - 5) < 1)
978 self.failUnless(abs(output["get"]["99_percentile"] - 5) < 1)
979 self.failUnless(abs(output["get"]["999_percentile"] - 5) < 1)