2 from twisted.trial import unittest
4 from twisted.internet import defer
5 import time, os.path, stat
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil
9 from allmydata.storage import BucketWriter, BucketReader, \
10 WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
11 storage_index_to_dir, DataTooLargeError
12 from allmydata.interfaces import BadWriteEnablerError
13 from allmydata.test.common import LoggingServiceParent
18 def __init__(self, ignore_disconnectors=False):
19 self.ignore = ignore_disconnectors
20 self.disconnectors = {}
21 def notifyOnDisconnect(self, f, *args, **kwargs):
25 self.disconnectors[m] = (f, args, kwargs)
27 def dontNotifyOnDisconnect(self, marker):
30 del self.disconnectors[marker]
32 class FakeStatsProvider:
33 def count(self, name, delta=1):
35 def register_producer(self, producer):
38 class Bucket(unittest.TestCase):
39 def make_workdir(self, name):
40 basedir = os.path.join("storage", "Bucket", name)
41 incoming = os.path.join(basedir, "tmp", "bucket")
42 final = os.path.join(basedir, "bucket")
43 fileutil.make_dirs(basedir)
44 fileutil.make_dirs(os.path.join(basedir, "tmp"))
45 return incoming, final
47 def bucket_writer_closed(self, bw, consumed):
49 def add_latency(self, category, latency):
51 def count(self, name, delta=1):
56 renew_secret = os.urandom(32)
57 cancel_secret = os.urandom(32)
58 expiration_time = time.time() + 5000
59 return (owner_num, renew_secret, cancel_secret, expiration_time)
61 def test_create(self):
62 incoming, final = self.make_workdir("test_create")
63 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
65 bw.remote_write(0, "a"*25)
66 bw.remote_write(25, "b"*25)
67 bw.remote_write(50, "c"*25)
68 bw.remote_write(75, "d"*7)
71 def test_readwrite(self):
72 incoming, final = self.make_workdir("test_readwrite")
73 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
75 bw.remote_write(0, "a"*25)
76 bw.remote_write(25, "b"*25)
77 bw.remote_write(50, "c"*7) # last block may be short
81 br = BucketReader(self, bw.finalhome)
82 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
83 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
84 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
88 def callRemote(self, methname, *args, **kwargs):
90 meth = getattr(self.target, "remote_" + methname)
91 return meth(*args, **kwargs)
92 return defer.maybeDeferred(_call)
94 class BucketProxy(unittest.TestCase):
95 def make_bucket(self, name, size):
96 basedir = os.path.join("storage", "BucketProxy", name)
97 incoming = os.path.join(basedir, "tmp", "bucket")
98 final = os.path.join(basedir, "bucket")
99 fileutil.make_dirs(basedir)
100 fileutil.make_dirs(os.path.join(basedir, "tmp"))
101 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
107 def make_lease(self):
109 renew_secret = os.urandom(32)
110 cancel_secret = os.urandom(32)
111 expiration_time = time.time() + 5000
112 return (owner_num, renew_secret, cancel_secret, expiration_time)
114 def bucket_writer_closed(self, bw, consumed):
116 def add_latency(self, category, latency):
118 def count(self, name, delta=1):
121 def test_create(self):
122 bw, rb, sharefname = self.make_bucket("test_create", 500)
123 bp = WriteBucketProxy(rb,
128 uri_extension_size=500, nodeid=None)
129 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
131 def test_readwrite(self):
132 # Let's pretend each share has 100 bytes of data, and that there are
133 # 4 segments (25 bytes each), and 8 shares total. So the three
134 # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
135 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
136 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
137 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
138 # long. That should make the whole share:
140 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
142 plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
144 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
146 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
148 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
150 uri_extension = "s" + "E"*498 + "e"
152 bw, rb, sharefname = self.make_bucket("test_readwrite", 1414)
153 bp = WriteBucketProxy(rb,
158 uri_extension_size=len(uri_extension),
162 d.addCallback(lambda res: bp.put_block(0, "a"*25))
163 d.addCallback(lambda res: bp.put_block(1, "b"*25))
164 d.addCallback(lambda res: bp.put_block(2, "c"*25))
165 d.addCallback(lambda res: bp.put_block(3, "d"*20))
166 d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
167 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
168 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
169 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
170 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
171 d.addCallback(lambda res: bp.close())
173 # now read everything back
174 def _start_reading(res):
175 br = BucketReader(self, sharefname)
178 rbp = ReadBucketProxy(rb, peerid="abc")
179 self.failUnless("to peer" in repr(rbp))
180 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
182 d1 = rbp.startIfNecessary()
183 d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
184 d1.addCallback(lambda res: rbp.get_block(0))
185 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
186 d1.addCallback(lambda res: rbp.get_block(1))
187 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
188 d1.addCallback(lambda res: rbp.get_block(2))
189 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
190 d1.addCallback(lambda res: rbp.get_block(3))
191 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
193 d1.addCallback(lambda res: rbp.get_plaintext_hashes())
194 d1.addCallback(lambda res:
195 self.failUnlessEqual(res, plaintext_hashes))
196 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
197 d1.addCallback(lambda res:
198 self.failUnlessEqual(res, crypttext_hashes))
199 d1.addCallback(lambda res: rbp.get_block_hashes())
200 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
201 d1.addCallback(lambda res: rbp.get_share_hashes())
202 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
203 d1.addCallback(lambda res: rbp.get_uri_extension())
204 d1.addCallback(lambda res:
205 self.failUnlessEqual(res, uri_extension))
209 d.addCallback(_start_reading)
215 class Server(unittest.TestCase):
218 self.sparent = LoggingServiceParent()
219 self._lease_secret = itertools.count()
221 return self.sparent.stopService()
223 def workdir(self, name):
224 basedir = os.path.join("storage", "Server", name)
227 def create(self, name, sizelimit=None):
228 workdir = self.workdir(name)
229 ss = StorageServer(workdir, sizelimit,
230 stats_provider=FakeStatsProvider())
231 ss.setServiceParent(self.sparent)
234 def test_create(self):
235 ss = self.create("test_create")
237 def allocate(self, ss, storage_index, sharenums, size, canary=None):
238 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
239 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
241 canary = FakeCanary()
242 return ss.remote_allocate_buckets(storage_index,
243 renew_secret, cancel_secret,
244 sharenums, size, canary)
246 def test_dont_overfill_dirs(self):
248 This test asserts that if you add a second share whose storage index
249 share lots of leading bits with an extant share (but isn't the exact
250 same storage index), this won't add an entry to the share directory.
252 ss = self.create("test_dont_overfill_dirs")
253 already, writers = self.allocate(ss, "storageindex", [0], 10)
254 for i, wb in writers.items():
255 wb.remote_write(0, "%10d" % i)
257 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
259 children_of_storedir = set(os.listdir(storedir))
261 # Now store another one under another storageindex that has leading
262 # chars the same as the first storageindex.
263 already, writers = self.allocate(ss, "storageindey", [0], 10)
264 for i, wb in writers.items():
265 wb.remote_write(0, "%10d" % i)
267 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
269 new_children_of_storedir = set(os.listdir(storedir))
270 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
272 def test_remove_incoming(self):
273 ss = self.create("test_remove_incoming")
274 already, writers = self.allocate(ss, "vid", range(3), 10)
275 for i,wb in writers.items():
276 wb.remote_write(0, "%10d" % i)
278 incomingdir = os.path.dirname(os.path.dirname(os.path.dirname(wb.incominghome)))
279 self.failIf(os.path.exists(incomingdir))
281 def test_allocate(self):
282 ss = self.create("test_allocate")
284 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
286 canary = FakeCanary()
287 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
288 self.failUnlessEqual(already, set())
289 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
291 # while the buckets are open, they should not count as readable
292 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
295 for i,wb in writers.items():
296 wb.remote_write(0, "%25d" % i)
298 # aborting a bucket that was already closed is a no-op
301 # now they should be readable
302 b = ss.remote_get_buckets("allocate")
303 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
304 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
306 # now if we ask about writing again, the server should offer those
307 # three buckets as already present. It should offer them even if we
308 # don't ask about those specific ones.
309 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
310 self.failUnlessEqual(already, set([0,1,2]))
311 self.failUnlessEqual(set(writers.keys()), set([3,4]))
313 # while those two buckets are open for writing, the server should
314 # refuse to offer them to uploaders
316 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
317 self.failUnlessEqual(already2, set([0,1,2]))
318 self.failUnlessEqual(set(writers2.keys()), set([5]))
320 # aborting the writes should remove the tempfiles
321 for i,wb in writers2.items():
323 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
324 self.failUnlessEqual(already2, set([0,1,2]))
325 self.failUnlessEqual(set(writers2.keys()), set([5]))
327 for i,wb in writers2.items():
329 for i,wb in writers.items():
332 def test_disconnect(self):
333 # simulate a disconnection
334 ss = self.create("test_disconnect")
335 canary = FakeCanary()
336 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
337 self.failUnlessEqual(already, set())
338 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
339 for (f,args,kwargs) in canary.disconnectors.values():
344 # that ought to delete the incoming shares
345 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
346 self.failUnlessEqual(already, set())
347 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
349 def test_sizelimits(self):
350 ss = self.create("test_sizelimits", 5000)
351 # a newly created and filled share incurs this much overhead, beyond
352 # the size we request.
354 LEASE_SIZE = 4+32+32+4
355 canary = FakeCanary(True)
356 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
357 self.failUnlessEqual(len(writers), 3)
358 # now the StorageServer should have 3000 bytes provisionally
359 # allocated, allowing only 2000 more to be claimed
360 self.failUnlessEqual(len(ss._active_writers), 3)
362 # allocating 1001-byte shares only leaves room for one
363 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
364 self.failUnlessEqual(len(writers2), 1)
365 self.failUnlessEqual(len(ss._active_writers), 4)
367 # we abandon the first set, so their provisional allocation should be
371 self.failUnlessEqual(len(ss._active_writers), 1)
372 # now we have a provisional allocation of 1001 bytes
374 # and we close the second set, so their provisional allocation should
375 # become real, long-term allocation, and grows to include the
377 for bw in writers2.values():
378 bw.remote_write(0, "a"*25)
383 self.failUnlessEqual(len(ss._active_writers), 0)
385 allocated = 1001 + OVERHEAD + LEASE_SIZE
386 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
387 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
388 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
389 self.failUnlessEqual(len(writers3), 39)
390 self.failUnlessEqual(len(ss._active_writers), 39)
394 self.failUnlessEqual(len(ss._active_writers), 0)
395 ss.disownServiceParent()
398 # creating a new StorageServer in the same directory should see the
401 # metadata that goes into the share file is counted upon share close,
402 # as well as at startup. metadata that goes into other files will not
403 # be counted until the next startup, so if we were creating any
404 # extra-file metadata, the allocation would be more than 'allocated'
405 # and this test would need to be changed.
406 ss = self.create("test_sizelimits", 5000)
407 already4,writers4 = self.allocate(ss, "vid4", range(100), 100, canary)
408 self.failUnlessEqual(len(writers4), 39)
409 self.failUnlessEqual(len(ss._active_writers), 39)
412 basedir = self.workdir("test_seek_behavior")
413 fileutil.make_dirs(basedir)
414 filename = os.path.join(basedir, "testfile")
415 f = open(filename, "wb")
418 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
419 # files. mode="a" preserves previous contents but does not allow
420 # seeking-to-create-holes. mode="r+" allows both.
421 f = open(filename, "rb+")
425 filelen = os.stat(filename)[stat.ST_SIZE]
426 self.failUnlessEqual(filelen, 100+3)
427 f2 = open(filename, "rb")
428 self.failUnlessEqual(f2.read(5), "start")
431 def test_leases(self):
432 ss = self.create("test_leases")
433 canary = FakeCanary()
437 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
438 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
439 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
440 sharenums, size, canary)
441 self.failUnlessEqual(len(already), 0)
442 self.failUnlessEqual(len(writers), 5)
443 for wb in writers.values():
446 leases = list(ss.get_leases("si0"))
447 self.failUnlessEqual(len(leases), 1)
448 self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
450 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
451 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
452 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
453 sharenums, size, canary)
454 for wb in writers.values():
457 # take out a second lease on si1
458 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
459 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
460 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
461 sharenums, size, canary)
462 self.failUnlessEqual(len(already), 5)
463 self.failUnlessEqual(len(writers), 0)
465 leases = list(ss.get_leases("si1"))
466 self.failUnlessEqual(len(leases), 2)
467 self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
469 # check that si0 is readable
470 readers = ss.remote_get_buckets("si0")
471 self.failUnlessEqual(len(readers), 5)
473 # renew the first lease. Only the proper renew_secret should work
474 ss.remote_renew_lease("si0", rs0)
475 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
476 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
478 # check that si0 is still readable
479 readers = ss.remote_get_buckets("si0")
480 self.failUnlessEqual(len(readers), 5)
483 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
484 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
485 ss.remote_cancel_lease("si0", cs0)
487 # si0 should now be gone
488 readers = ss.remote_get_buckets("si0")
489 self.failUnlessEqual(len(readers), 0)
490 # and the renew should no longer work
491 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
494 # cancel the first lease on si1, leaving the second in place
495 ss.remote_cancel_lease("si1", cs1)
496 readers = ss.remote_get_buckets("si1")
497 self.failUnlessEqual(len(readers), 5)
498 # the corresponding renew should no longer work
499 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
501 leases = list(ss.get_leases("si1"))
502 self.failUnlessEqual(len(leases), 1)
503 self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
505 ss.remote_renew_lease("si1", rs2)
506 # cancelling the second should make it go away
507 ss.remote_cancel_lease("si1", cs2)
508 readers = ss.remote_get_buckets("si1")
509 self.failUnlessEqual(len(readers), 0)
510 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
511 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
513 leases = list(ss.get_leases("si1"))
514 self.failUnlessEqual(len(leases), 0)
517 # test overlapping uploads
518 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
519 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
520 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
521 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
522 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
523 sharenums, size, canary)
524 self.failUnlessEqual(len(already), 0)
525 self.failUnlessEqual(len(writers), 5)
526 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
527 sharenums, size, canary)
528 self.failUnlessEqual(len(already2), 0)
529 self.failUnlessEqual(len(writers2), 0)
530 for wb in writers.values():
533 leases = list(ss.get_leases("si3"))
534 self.failUnlessEqual(len(leases), 1)
536 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
537 sharenums, size, canary)
538 self.failUnlessEqual(len(already3), 5)
539 self.failUnlessEqual(len(writers3), 0)
541 leases = list(ss.get_leases("si3"))
542 self.failUnlessEqual(len(leases), 2)
544 def test_readonly(self):
545 workdir = self.workdir("test_readonly")
546 ss = StorageServer(workdir, readonly_storage=True)
547 ss.setServiceParent(self.sparent)
549 canary = FakeCanary()
550 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
551 self.failUnlessEqual(already, set())
552 self.failUnlessEqual(writers, {})
554 def test_discard(self):
555 # discard is really only used for other tests, but we test it anyways
556 workdir = self.workdir("test_discard")
557 ss = StorageServer(workdir, discard_storage=True)
558 ss.setServiceParent(self.sparent)
560 canary = FakeCanary()
561 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
562 self.failUnlessEqual(already, set())
563 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
564 for i,wb in writers.items():
565 wb.remote_write(0, "%25d" % i)
567 # since we discard the data, the shares should be present but sparse.
568 # Since we write with some seeks, the data we read back will be all
570 b = ss.remote_get_buckets("vid")
571 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
572 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
576 class MutableServer(unittest.TestCase):
579 self.sparent = LoggingServiceParent()
580 self._lease_secret = itertools.count()
582 return self.sparent.stopService()
584 def workdir(self, name):
585 basedir = os.path.join("storage", "MutableServer", name)
588 def create(self, name, sizelimit=None):
589 workdir = self.workdir(name)
590 ss = StorageServer(workdir, sizelimit)
591 ss.setServiceParent(self.sparent)
592 ss.setNodeID("\x00" * 32)
595 def test_create(self):
596 ss = self.create("test_create")
598 def write_enabler(self, we_tag):
599 return hashutil.tagged_hash("we_blah", we_tag)
601 def renew_secret(self, tag):
602 return hashutil.tagged_hash("renew_blah", str(tag))
604 def cancel_secret(self, tag):
605 return hashutil.tagged_hash("cancel_blah", str(tag))
607 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
608 write_enabler = self.write_enabler(we_tag)
609 renew_secret = self.renew_secret(lease_tag)
610 cancel_secret = self.cancel_secret(lease_tag)
611 rstaraw = ss.remote_slot_testv_and_readv_and_writev
612 testandwritev = dict( [ (shnum, ([], [], None) )
613 for shnum in sharenums ] )
615 rc = rstaraw(storage_index,
616 (write_enabler, renew_secret, cancel_secret),
619 (did_write, readv_data) = rc
620 self.failUnless(did_write)
621 self.failUnless(isinstance(readv_data, dict))
622 self.failUnlessEqual(len(readv_data), 0)
624 def test_container_size(self):
625 ss = self.create("test_container_size")
626 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
628 rstaraw = ss.remote_slot_testv_and_readv_and_writev
629 secrets = ( self.write_enabler("we1"),
630 self.renew_secret("we1"),
631 self.cancel_secret("we1") )
632 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
633 answer = rstaraw("si1", secrets,
634 {0: ([], [(0,data)], len(data)+12)},
636 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
638 # trying to make the container too large will raise an exception
639 TOOBIG = MutableShareFile.MAX_SIZE + 10
640 self.failUnlessRaises(DataTooLargeError,
641 rstaraw, "si1", secrets,
642 {0: ([], [(0,data)], TOOBIG)},
645 # it should be possible to make the container smaller, although at
646 # the moment this doesn't actually affect the share
647 answer = rstaraw("si1", secrets,
648 {0: ([], [(0,data)], len(data)+8)},
650 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
652 def test_allocate(self):
653 ss = self.create("test_allocate")
654 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
657 read = ss.remote_slot_readv
658 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
660 self.failUnlessEqual(read("si1", [], [(0, 10)]),
661 {0: [""], 1: [""], 2: [""]})
662 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
666 secrets = ( self.write_enabler("we1"),
667 self.renew_secret("we1"),
668 self.cancel_secret("we1") )
669 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
670 write = ss.remote_slot_testv_and_readv_and_writev
671 answer = write("si1", secrets,
672 {0: ([], [(0,data)], None)},
674 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
676 self.failUnlessEqual(read("si1", [0], [(0,20)]),
677 {0: ["00000000001111111111"]})
678 self.failUnlessEqual(read("si1", [0], [(95,10)]),
680 #self.failUnlessEqual(s0.remote_get_length(), 100)
682 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
683 f = self.failUnlessRaises(BadWriteEnablerError,
684 write, "si1", bad_secrets,
686 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
688 # this testv should fail
689 answer = write("si1", secrets,
690 {0: ([(0, 12, "eq", "444444444444"),
691 (20, 5, "eq", "22222"),
698 self.failUnlessEqual(answer, (False,
699 {0: ["000000000011", "22222"],
703 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
706 answer = write("si1", secrets,
707 {0: ([(10, 5, "lt", "11111"),
714 self.failUnlessEqual(answer, (False,
719 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
722 def test_operators(self):
723 # test operators, the data we're comparing is '11111' in all cases.
724 # test both fail+pass, reset data after each one.
725 ss = self.create("test_operators")
727 secrets = ( self.write_enabler("we1"),
728 self.renew_secret("we1"),
729 self.cancel_secret("we1") )
730 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
731 write = ss.remote_slot_testv_and_readv_and_writev
732 read = ss.remote_slot_readv
735 write("si1", secrets,
736 {0: ([], [(0,data)], None)},
742 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
747 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
748 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
749 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
752 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
757 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
758 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
761 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
766 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
767 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
771 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
776 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
777 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
780 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
785 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
786 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
789 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
794 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
795 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
799 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
804 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
805 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
808 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
813 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
814 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
818 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
823 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
824 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
827 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
832 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
833 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
837 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
842 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
843 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
846 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
851 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
852 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
855 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
860 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
861 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
865 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
870 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
871 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
874 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
879 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
880 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
883 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
888 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
889 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
892 # finally, test some operators against empty shares
893 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
898 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
899 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
902 def test_readv(self):
903 ss = self.create("test_readv")
904 secrets = ( self.write_enabler("we1"),
905 self.renew_secret("we1"),
906 self.cancel_secret("we1") )
907 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
908 write = ss.remote_slot_testv_and_readv_and_writev
909 read = ss.remote_slot_readv
910 data = [("%d" % i) * 100 for i in range(3)]
911 rc = write("si1", secrets,
912 {0: ([], [(0,data[0])], None),
913 1: ([], [(0,data[1])], None),
914 2: ([], [(0,data[2])], None),
916 self.failUnlessEqual(rc, (True, {}))
918 answer = read("si1", [], [(0, 10)])
919 self.failUnlessEqual(answer, {0: ["0"*10],
923 def compare_leases_without_timestamps(self, a, b):
924 self.failUnlessEqual(len(a), len(b))
925 for i in range(len(a)):
926 (num_a, (ownerid_a, expiration_time_a,
927 renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
928 (num_b, (ownerid_b, expiration_time_b,
929 renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
930 self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
931 cancel_secret_a, nodeid_a),
932 (num_b, ownerid_b, renew_secret_b,
933 cancel_secret_b, nodeid_b) )
935 def test_leases(self):
936 ss = self.create("test_leases", sizelimit=1000*1000)
938 return ( self.write_enabler("we1"),
939 self.renew_secret("we1-%d" % n),
940 self.cancel_secret("we1-%d" % n) )
941 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
942 write = ss.remote_slot_testv_and_readv_and_writev
943 read = ss.remote_slot_readv
944 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
945 self.failUnlessEqual(rc, (True, {}))
947 # create a random non-numeric file in the bucket directory, to
948 # exercise the code that's supposed to ignore those.
949 bucket_dir = os.path.join(self.workdir("test_leases"),
950 "shares", storage_index_to_dir("si1"))
951 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
952 f.write("you ought to be ignoring me\n")
955 # re-allocate the slots and use the same secrets, that should update
957 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
960 ss.remote_renew_lease("si1", secrets(0)[1])
962 # now allocate them with a bunch of different secrets, to trigger the
963 # extended lease code
964 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
965 write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
966 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
967 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
968 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
971 ss.remote_cancel_lease("si1", secrets(5)[2])
973 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
974 all_leases = s0.debug_get_leases()
975 self.failUnlessEqual(len(all_leases), 5)
977 # and write enough data to expand the container, forcing the server
979 write("si1", secrets(0),
980 {0: ([], [(0,data)], 200), },
983 # read back the leases, make sure they're still intact.
984 self.compare_leases_without_timestamps(all_leases,
985 s0.debug_get_leases())
987 ss.remote_renew_lease("si1", secrets(0)[1])
988 ss.remote_renew_lease("si1", secrets(1)[1])
989 ss.remote_renew_lease("si1", secrets(2)[1])
990 ss.remote_renew_lease("si1", secrets(3)[1])
991 ss.remote_renew_lease("si1", secrets(4)[1])
992 self.compare_leases_without_timestamps(all_leases,
993 s0.debug_get_leases())
994 # get a new copy of the leases, with the current timestamps. Reading
995 # data and failing to renew/cancel leases should leave the timestamps
997 all_leases = s0.debug_get_leases()
998 # renewing with a bogus token should prompt an error message
1000 # TODO: examine the exception thus raised, make sure the old nodeid
1001 # is present, to provide for share migration
1002 self.failUnlessRaises(IndexError,
1003 ss.remote_renew_lease, "si1",
1005 # same for cancelling
1006 self.failUnlessRaises(IndexError,
1007 ss.remote_cancel_lease, "si1",
1009 self.failUnlessEqual(all_leases, s0.debug_get_leases())
1011 # reading shares should not modify the timestamp
1012 read("si1", [], [(0,200)])
1013 self.failUnlessEqual(all_leases, s0.debug_get_leases())
1015 write("si1", secrets(0),
1016 {0: ([], [(200, "make me bigger")], None)}, [])
1017 self.compare_leases_without_timestamps(all_leases,
1018 s0.debug_get_leases())
1020 write("si1", secrets(0),
1021 {0: ([], [(500, "make me really bigger")], None)}, [])
1022 self.compare_leases_without_timestamps(all_leases,
1023 s0.debug_get_leases())
1025 # now cancel them all
1026 ss.remote_cancel_lease("si1", secrets(0)[2])
1027 ss.remote_cancel_lease("si1", secrets(1)[2])
1028 ss.remote_cancel_lease("si1", secrets(2)[2])
1029 ss.remote_cancel_lease("si1", secrets(3)[2])
1031 # the slot should still be there
1032 remaining_shares = read("si1", [], [(0,10)])
1033 self.failUnlessEqual(len(remaining_shares), 1)
1034 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1036 # cancelling a non-existent lease should raise an IndexError
1037 self.failUnlessRaises(IndexError,
1038 ss.remote_cancel_lease, "si1", "nonsecret")
1040 # and the slot should still be there
1041 remaining_shares = read("si1", [], [(0,10)])
1042 self.failUnlessEqual(len(remaining_shares), 1)
1043 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1045 ss.remote_cancel_lease("si1", secrets(4)[2])
1046 # now the slot should be gone
1047 no_shares = read("si1", [], [(0,10)])
1048 self.failUnlessEqual(no_shares, {})
1050 # cancelling a lease on a non-existent share should raise an IndexError
1051 self.failUnlessRaises(IndexError,
1052 ss.remote_cancel_lease, "si2", "nonsecret")
1055 class Stats(unittest.TestCase):
1058 self.sparent = LoggingServiceParent()
1059 self._lease_secret = itertools.count()
1061 return self.sparent.stopService()
1063 def workdir(self, name):
1064 basedir = os.path.join("storage", "Server", name)
1067 def create(self, name, sizelimit=None):
1068 workdir = self.workdir(name)
1069 ss = StorageServer(workdir, sizelimit)
1070 ss.setServiceParent(self.sparent)
1073 def test_latencies(self):
1074 ss = self.create("test_latencies")
1075 for i in range(10000):
1076 ss.add_latency("allocate", 1.0 * i)
1077 for i in range(1000):
1078 ss.add_latency("renew", 1.0 * i)
1080 ss.add_latency("cancel", 2.0 * i)
1081 ss.add_latency("get", 5.0)
1083 output = ss.get_latencies()
1085 self.failUnlessEqual(sorted(output.keys()),
1086 sorted(["allocate", "renew", "cancel", "get"]))
1087 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1088 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1089 self.failUnless(abs(output["allocate"]["median"] - 9500) < 1)
1090 self.failUnless(abs(output["allocate"]["90_percentile"] - 9900) < 1)
1091 self.failUnless(abs(output["allocate"]["95_percentile"] - 9950) < 1)
1092 self.failUnless(abs(output["allocate"]["99_percentile"] - 9990) < 1)
1093 self.failUnless(abs(output["allocate"]["999_percentile"] - 9999) < 1)
1095 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1096 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1097 self.failUnless(abs(output["renew"]["median"] - 500) < 1)
1098 self.failUnless(abs(output["renew"]["90_percentile"] - 900) < 1)
1099 self.failUnless(abs(output["renew"]["95_percentile"] - 950) < 1)
1100 self.failUnless(abs(output["renew"]["99_percentile"] - 990) < 1)
1101 self.failUnless(abs(output["renew"]["999_percentile"] - 999) < 1)
1103 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1104 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1105 self.failUnless(abs(output["cancel"]["median"] - 10) < 1)
1106 self.failUnless(abs(output["cancel"]["90_percentile"] - 18) < 1)
1107 self.failUnless(abs(output["cancel"]["95_percentile"] - 18) < 1)
1108 self.failUnless(abs(output["cancel"]["99_percentile"] - 18) < 1)
1109 self.failUnless(abs(output["cancel"]["999_percentile"] - 18) < 1)
1111 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1112 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1113 self.failUnless(abs(output["get"]["median"] - 5) < 1)
1114 self.failUnless(abs(output["get"]["90_percentile"] - 5) < 1)
1115 self.failUnless(abs(output["get"]["95_percentile"] - 5) < 1)
1116 self.failUnless(abs(output["get"]["99_percentile"] - 5) < 1)
1117 self.failUnless(abs(output["get"]["999_percentile"] - 5) < 1)