2 from twisted.trial import unittest
4 from twisted.internet import defer
5 import time, os.path, stat
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil
9 from allmydata.storage import BucketWriter, BucketReader, \
10 WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
11 storage_index_to_dir, DataTooLargeError
12 from allmydata.interfaces import BadWriteEnablerError
13 from allmydata.test.common import LoggingServiceParent
18 def __init__(self, ignore_disconnectors=False):
19 self.ignore = ignore_disconnectors
20 self.disconnectors = {}
21 def notifyOnDisconnect(self, f, *args, **kwargs):
25 self.disconnectors[m] = (f, args, kwargs)
27 def dontNotifyOnDisconnect(self, marker):
30 del self.disconnectors[marker]
32 class FakeStatsProvider:
33 def count(self, name, delta=1):
35 def register_producer(self, producer):
38 class Bucket(unittest.TestCase):
39 def make_workdir(self, name):
40 basedir = os.path.join("storage", "Bucket", name)
41 incoming = os.path.join(basedir, "tmp", "bucket")
42 final = os.path.join(basedir, "bucket")
43 fileutil.make_dirs(basedir)
44 fileutil.make_dirs(os.path.join(basedir, "tmp"))
45 return incoming, final
47 def bucket_writer_closed(self, bw, consumed):
49 def add_latency(self, category, latency):
51 def count(self, name, delta=1):
56 renew_secret = os.urandom(32)
57 cancel_secret = os.urandom(32)
58 expiration_time = time.time() + 5000
59 return (owner_num, renew_secret, cancel_secret, expiration_time)
61 def test_create(self):
62 incoming, final = self.make_workdir("test_create")
63 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
65 bw.remote_write(0, "a"*25)
66 bw.remote_write(25, "b"*25)
67 bw.remote_write(50, "c"*25)
68 bw.remote_write(75, "d"*7)
71 def test_readwrite(self):
72 incoming, final = self.make_workdir("test_readwrite")
73 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
75 bw.remote_write(0, "a"*25)
76 bw.remote_write(25, "b"*25)
77 bw.remote_write(50, "c"*7) # last block may be short
81 br = BucketReader(self, bw.finalhome)
82 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
83 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
84 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
88 def callRemote(self, methname, *args, **kwargs):
90 meth = getattr(self.target, "remote_" + methname)
91 return meth(*args, **kwargs)
92 return defer.maybeDeferred(_call)
94 class BucketProxy(unittest.TestCase):
95 def make_bucket(self, name, size):
96 basedir = os.path.join("storage", "BucketProxy", name)
97 incoming = os.path.join(basedir, "tmp", "bucket")
98 final = os.path.join(basedir, "bucket")
99 fileutil.make_dirs(basedir)
100 fileutil.make_dirs(os.path.join(basedir, "tmp"))
101 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
107 def make_lease(self):
109 renew_secret = os.urandom(32)
110 cancel_secret = os.urandom(32)
111 expiration_time = time.time() + 5000
112 return (owner_num, renew_secret, cancel_secret, expiration_time)
114 def bucket_writer_closed(self, bw, consumed):
116 def add_latency(self, category, latency):
118 def count(self, name, delta=1):
121 def test_create(self):
122 bw, rb, sharefname = self.make_bucket("test_create", 500)
123 bp = WriteBucketProxy(rb,
128 uri_extension_size=500, nodeid=None)
129 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
131 def test_readwrite(self):
132 # Let's pretend each share has 100 bytes of data, and that there are
133 # 4 segments (25 bytes each), and 8 shares total. So the three
134 # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
135 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
136 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
137 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
138 # long. That should make the whole share:
140 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
142 plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
144 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
146 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
148 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
150 uri_extension = "s" + "E"*498 + "e"
152 bw, rb, sharefname = self.make_bucket("test_readwrite", 1414)
153 bp = WriteBucketProxy(rb,
158 uri_extension_size=len(uri_extension),
162 d.addCallback(lambda res: bp.put_block(0, "a"*25))
163 d.addCallback(lambda res: bp.put_block(1, "b"*25))
164 d.addCallback(lambda res: bp.put_block(2, "c"*25))
165 d.addCallback(lambda res: bp.put_block(3, "d"*20))
166 d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
167 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
168 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
169 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
170 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
171 d.addCallback(lambda res: bp.close())
173 # now read everything back
174 def _start_reading(res):
175 br = BucketReader(self, sharefname)
178 rbp = ReadBucketProxy(rb, peerid="abc")
179 self.failUnless("to peer" in repr(rbp))
180 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
182 d1 = rbp.startIfNecessary()
183 d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
184 d1.addCallback(lambda res: rbp.get_block(0))
185 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
186 d1.addCallback(lambda res: rbp.get_block(1))
187 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
188 d1.addCallback(lambda res: rbp.get_block(2))
189 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
190 d1.addCallback(lambda res: rbp.get_block(3))
191 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
193 d1.addCallback(lambda res: rbp.get_plaintext_hashes())
194 d1.addCallback(lambda res:
195 self.failUnlessEqual(res, plaintext_hashes))
196 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
197 d1.addCallback(lambda res:
198 self.failUnlessEqual(res, crypttext_hashes))
199 d1.addCallback(lambda res: rbp.get_block_hashes())
200 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
201 d1.addCallback(lambda res: rbp.get_share_hashes())
202 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
203 d1.addCallback(lambda res: rbp.get_uri_extension())
204 d1.addCallback(lambda res:
205 self.failUnlessEqual(res, uri_extension))
209 d.addCallback(_start_reading)
215 class Server(unittest.TestCase):
218 self.sparent = LoggingServiceParent()
219 self._lease_secret = itertools.count()
221 return self.sparent.stopService()
223 def workdir(self, name):
224 basedir = os.path.join("storage", "Server", name)
227 def create(self, name, sizelimit=None):
228 workdir = self.workdir(name)
229 ss = StorageServer(workdir, sizelimit,
230 stats_provider=FakeStatsProvider())
231 ss.setServiceParent(self.sparent)
234 def test_create(self):
235 ss = self.create("test_create")
237 def allocate(self, ss, storage_index, sharenums, size, canary=None):
238 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
239 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
241 canary = FakeCanary()
242 return ss.remote_allocate_buckets(storage_index,
243 renew_secret, cancel_secret,
244 sharenums, size, canary)
246 def test_dont_overfill_dirs(self):
248 This test asserts that if you add a second share whose storage index
249 share lots of leading bits with an extant share (but isn't the exact
250 same storage index), this won't add an entry to the share directory.
252 ss = self.create("test_dont_overfill_dirs")
253 already, writers = self.allocate(ss, "storageindex", [0], 10)
254 for i, wb in writers.items():
255 wb.remote_write(0, "%10d" % i)
257 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
259 children_of_storedir = set(os.listdir(storedir))
261 # Now store another one under another storageindex that has leading
262 # chars the same as the first storageindex.
263 already, writers = self.allocate(ss, "storageindey", [0], 10)
264 for i, wb in writers.items():
265 wb.remote_write(0, "%10d" % i)
267 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
269 new_children_of_storedir = set(os.listdir(storedir))
270 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
272 def test_remove_incoming(self):
273 ss = self.create("test_remove_incoming")
274 already, writers = self.allocate(ss, "vid", range(3), 10)
275 for i,wb in writers.items():
276 wb.remote_write(0, "%10d" % i)
278 incomingdir = os.path.dirname(os.path.dirname(os.path.dirname(wb.incominghome)))
279 self.failIf(os.path.exists(incomingdir))
281 def test_allocate(self):
282 ss = self.create("test_allocate")
284 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
286 canary = FakeCanary()
287 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
288 self.failUnlessEqual(already, set())
289 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
291 # while the buckets are open, they should not count as readable
292 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
295 for i,wb in writers.items():
296 wb.remote_write(0, "%25d" % i)
298 # aborting a bucket that was already closed is a no-op
301 # now they should be readable
302 b = ss.remote_get_buckets("allocate")
303 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
304 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
306 # now if we ask about writing again, the server should offer those
307 # three buckets as already present. It should offer them even if we
308 # don't ask about those specific ones.
309 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
310 self.failUnlessEqual(already, set([0,1,2]))
311 self.failUnlessEqual(set(writers.keys()), set([3,4]))
313 # while those two buckets are open for writing, the server should
314 # refuse to offer them to uploaders
316 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
317 self.failUnlessEqual(already2, set([0,1,2]))
318 self.failUnlessEqual(set(writers2.keys()), set([5]))
320 # aborting the writes should remove the tempfiles
321 for i,wb in writers2.items():
323 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
324 self.failUnlessEqual(already2, set([0,1,2]))
325 self.failUnlessEqual(set(writers2.keys()), set([5]))
327 for i,wb in writers2.items():
329 for i,wb in writers.items():
332 def test_disconnect(self):
333 # simulate a disconnection
334 ss = self.create("test_disconnect")
335 canary = FakeCanary()
336 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
337 self.failUnlessEqual(already, set())
338 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
339 for (f,args,kwargs) in canary.disconnectors.values():
344 # that ought to delete the incoming shares
345 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
346 self.failUnlessEqual(already, set())
347 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
349 def test_sizelimits(self):
350 ss = self.create("test_sizelimits", 5000)
351 # a newly created and filled share incurs this much overhead, beyond
352 # the size we request.
354 LEASE_SIZE = 4+32+32+4
355 canary = FakeCanary(True)
356 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
357 self.failUnlessEqual(len(writers), 3)
358 # now the StorageServer should have 3000 bytes provisionally
359 # allocated, allowing only 2000 more to be claimed
360 self.failUnlessEqual(len(ss._active_writers), 3)
362 # allocating 1001-byte shares only leaves room for one
363 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
364 self.failUnlessEqual(len(writers2), 1)
365 self.failUnlessEqual(len(ss._active_writers), 4)
367 # we abandon the first set, so their provisional allocation should be
371 self.failUnlessEqual(len(ss._active_writers), 1)
372 # now we have a provisional allocation of 1001 bytes
374 # and we close the second set, so their provisional allocation should
375 # become real, long-term allocation, and grows to include the
377 for bw in writers2.values():
378 bw.remote_write(0, "a"*25)
383 self.failUnlessEqual(len(ss._active_writers), 0)
385 allocated = 1001 + OVERHEAD + LEASE_SIZE
386 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
387 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
388 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
389 self.failUnlessEqual(len(writers3), 39)
390 self.failUnlessEqual(len(ss._active_writers), 39)
394 self.failUnlessEqual(len(ss._active_writers), 0)
395 ss.disownServiceParent()
398 # creating a new StorageServer in the same directory should see the
401 # metadata that goes into the share file is counted upon share close,
402 # as well as at startup. metadata that goes into other files will not
403 # be counted until the next startup, so if we were creating any
404 # extra-file metadata, the allocation would be more than 'allocated'
405 # and this test would need to be changed.
406 ss = self.create("test_sizelimits", 5000)
407 already4,writers4 = self.allocate(ss, "vid4", range(100), 100, canary)
408 self.failUnlessEqual(len(writers4), 39)
409 self.failUnlessEqual(len(ss._active_writers), 39)
412 basedir = self.workdir("test_seek_behavior")
413 fileutil.make_dirs(basedir)
414 filename = os.path.join(basedir, "testfile")
415 f = open(filename, "wb")
418 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
419 # files. mode="a" preserves previous contents but does not allow
420 # seeking-to-create-holes. mode="r+" allows both.
421 f = open(filename, "rb+")
425 filelen = os.stat(filename)[stat.ST_SIZE]
426 self.failUnlessEqual(filelen, 100+3)
427 f2 = open(filename, "rb")
428 self.failUnlessEqual(f2.read(5), "start")
431 def test_leases(self):
432 ss = self.create("test_leases")
433 canary = FakeCanary()
437 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
438 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
439 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
440 sharenums, size, canary)
441 self.failUnlessEqual(len(already), 0)
442 self.failUnlessEqual(len(writers), 5)
443 for wb in writers.values():
446 leases = list(ss.get_leases("si0"))
447 self.failUnlessEqual(len(leases), 1)
448 self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
450 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
451 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
452 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
453 sharenums, size, canary)
454 for wb in writers.values():
457 # take out a second lease on si1
458 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
459 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
460 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
461 sharenums, size, canary)
462 self.failUnlessEqual(len(already), 5)
463 self.failUnlessEqual(len(writers), 0)
465 leases = list(ss.get_leases("si1"))
466 self.failUnlessEqual(len(leases), 2)
467 self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
469 # check that si0 is readable
470 readers = ss.remote_get_buckets("si0")
471 self.failUnlessEqual(len(readers), 5)
473 # renew the first lease. Only the proper renew_secret should work
474 ss.remote_renew_lease("si0", rs0)
475 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
476 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
478 # check that si0 is still readable
479 readers = ss.remote_get_buckets("si0")
480 self.failUnlessEqual(len(readers), 5)
483 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
484 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
485 ss.remote_cancel_lease("si0", cs0)
487 # si0 should now be gone
488 readers = ss.remote_get_buckets("si0")
489 self.failUnlessEqual(len(readers), 0)
490 # and the renew should no longer work
491 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
494 # cancel the first lease on si1, leaving the second in place
495 ss.remote_cancel_lease("si1", cs1)
496 readers = ss.remote_get_buckets("si1")
497 self.failUnlessEqual(len(readers), 5)
498 # the corresponding renew should no longer work
499 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
501 leases = list(ss.get_leases("si1"))
502 self.failUnlessEqual(len(leases), 1)
503 self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
505 ss.remote_renew_lease("si1", rs2)
506 # cancelling the second should make it go away
507 ss.remote_cancel_lease("si1", cs2)
508 readers = ss.remote_get_buckets("si1")
509 self.failUnlessEqual(len(readers), 0)
510 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
511 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
513 leases = list(ss.get_leases("si1"))
514 self.failUnlessEqual(len(leases), 0)
517 # test overlapping uploads
518 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
519 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
520 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
521 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
522 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
523 sharenums, size, canary)
524 self.failUnlessEqual(len(already), 0)
525 self.failUnlessEqual(len(writers), 5)
526 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
527 sharenums, size, canary)
528 self.failUnlessEqual(len(already2), 0)
529 self.failUnlessEqual(len(writers2), 0)
530 for wb in writers.values():
533 leases = list(ss.get_leases("si3"))
534 self.failUnlessEqual(len(leases), 1)
536 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
537 sharenums, size, canary)
538 self.failUnlessEqual(len(already3), 5)
539 self.failUnlessEqual(len(writers3), 0)
541 leases = list(ss.get_leases("si3"))
542 self.failUnlessEqual(len(leases), 2)
544 def test_readonly(self):
545 workdir = self.workdir("test_readonly")
546 ss = StorageServer(workdir, readonly_storage=True)
547 ss.setServiceParent(self.sparent)
549 canary = FakeCanary()
550 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
551 self.failUnlessEqual(already, set())
552 self.failUnlessEqual(writers, {})
554 def test_discard(self):
555 # discard is really only used for other tests, but we test it anyways
556 workdir = self.workdir("test_discard")
557 ss = StorageServer(workdir, discard_storage=True)
558 ss.setServiceParent(self.sparent)
560 canary = FakeCanary()
561 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
562 self.failUnlessEqual(already, set())
563 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
564 for i,wb in writers.items():
565 wb.remote_write(0, "%25d" % i)
567 # since we discard the data, the shares should be present but sparse.
568 # Since we write with some seeks, the data we read back will be all
570 b = ss.remote_get_buckets("vid")
571 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
572 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
576 class MutableServer(unittest.TestCase):
579 self.sparent = LoggingServiceParent()
580 self._lease_secret = itertools.count()
582 return self.sparent.stopService()
584 def workdir(self, name):
585 basedir = os.path.join("storage", "MutableServer", name)
588 def create(self, name, sizelimit=None):
589 workdir = self.workdir(name)
590 ss = StorageServer(workdir, sizelimit)
591 ss.setServiceParent(self.sparent)
592 ss.setNodeID("\x00" * 32)
595 def test_create(self):
596 ss = self.create("test_create")
598 def write_enabler(self, we_tag):
599 return hashutil.tagged_hash("we_blah", we_tag)
601 def renew_secret(self, tag):
602 return hashutil.tagged_hash("renew_blah", str(tag))
604 def cancel_secret(self, tag):
605 return hashutil.tagged_hash("cancel_blah", str(tag))
607 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
608 write_enabler = self.write_enabler(we_tag)
609 renew_secret = self.renew_secret(lease_tag)
610 cancel_secret = self.cancel_secret(lease_tag)
611 rstaraw = ss.remote_slot_testv_and_readv_and_writev
612 testandwritev = dict( [ (shnum, ([], [], None) )
613 for shnum in sharenums ] )
615 rc = rstaraw(storage_index,
616 (write_enabler, renew_secret, cancel_secret),
619 (did_write, readv_data) = rc
620 self.failUnless(did_write)
621 self.failUnless(isinstance(readv_data, dict))
622 self.failUnlessEqual(len(readv_data), 0)
624 def test_container_size(self):
625 ss = self.create("test_container_size")
626 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
628 rstaraw = ss.remote_slot_testv_and_readv_and_writev
629 secrets = ( self.write_enabler("we1"),
630 self.renew_secret("we1"),
631 self.cancel_secret("we1") )
632 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
633 answer = rstaraw("si1", secrets,
634 {0: ([], [(0,data)], len(data)+12)},
636 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
638 # trying to make the container too large will raise an exception
639 TOOBIG = MutableShareFile.MAX_SIZE + 10
640 self.failUnlessRaises(DataTooLargeError,
641 rstaraw, "si1", secrets,
642 {0: ([], [(0,data)], TOOBIG)},
645 # it should be possible to make the container smaller, although at
646 # the moment this doesn't actually affect the share
647 answer = rstaraw("si1", secrets,
648 {0: ([], [(0,data)], len(data)+8)},
650 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
652 def test_allocate(self):
653 ss = self.create("test_allocate")
654 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
657 read = ss.remote_slot_readv
658 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
660 self.failUnlessEqual(read("si1", [], [(0, 10)]),
661 {0: [""], 1: [""], 2: [""]})
662 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
666 secrets = ( self.write_enabler("we1"),
667 self.renew_secret("we1"),
668 self.cancel_secret("we1") )
669 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
670 write = ss.remote_slot_testv_and_readv_and_writev
671 answer = write("si1", secrets,
672 {0: ([], [(0,data)], None)},
674 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
676 self.failUnlessEqual(read("si1", [0], [(0,20)]),
677 {0: ["00000000001111111111"]})
678 self.failUnlessEqual(read("si1", [0], [(95,10)]),
680 #self.failUnlessEqual(s0.remote_get_length(), 100)
682 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
683 f = self.failUnlessRaises(BadWriteEnablerError,
684 write, "si1", bad_secrets,
686 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
688 # this testv should fail
689 answer = write("si1", secrets,
690 {0: ([(0, 12, "eq", "444444444444"),
691 (20, 5, "eq", "22222"),
698 self.failUnlessEqual(answer, (False,
699 {0: ["000000000011", "22222"],
703 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
706 answer = write("si1", secrets,
707 {0: ([(10, 5, "lt", "11111"),
714 self.failUnlessEqual(answer, (False,
719 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
722 def test_operators(self):
723 # test operators, the data we're comparing is '11111' in all cases.
724 # test both fail+pass, reset data after each one.
725 ss = self.create("test_operators")
727 secrets = ( self.write_enabler("we1"),
728 self.renew_secret("we1"),
729 self.cancel_secret("we1") )
730 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
731 write = ss.remote_slot_testv_and_readv_and_writev
732 read = ss.remote_slot_readv
735 write("si1", secrets,
736 {0: ([], [(0,data)], None)},
742 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
747 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
748 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
749 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
752 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
757 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
758 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
761 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
766 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
767 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
771 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
776 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
777 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
780 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
785 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
786 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
789 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
794 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
795 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
799 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
804 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
805 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
808 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
813 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
814 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
818 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
823 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
824 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
827 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
832 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
833 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
837 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
842 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
843 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
846 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
851 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
852 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
855 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
860 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
861 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
865 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
870 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
871 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
874 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
879 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
880 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
883 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
888 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
889 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
892 def test_readv(self):
893 ss = self.create("test_readv")
894 secrets = ( self.write_enabler("we1"),
895 self.renew_secret("we1"),
896 self.cancel_secret("we1") )
897 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
898 write = ss.remote_slot_testv_and_readv_and_writev
899 read = ss.remote_slot_readv
900 data = [("%d" % i) * 100 for i in range(3)]
901 rc = write("si1", secrets,
902 {0: ([], [(0,data[0])], None),
903 1: ([], [(0,data[1])], None),
904 2: ([], [(0,data[2])], None),
906 self.failUnlessEqual(rc, (True, {}))
908 answer = read("si1", [], [(0, 10)])
909 self.failUnlessEqual(answer, {0: ["0"*10],
913 def compare_leases_without_timestamps(self, a, b):
914 self.failUnlessEqual(len(a), len(b))
915 for i in range(len(a)):
916 (num_a, (ownerid_a, expiration_time_a,
917 renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
918 (num_b, (ownerid_b, expiration_time_b,
919 renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
920 self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
921 cancel_secret_a, nodeid_a),
922 (num_b, ownerid_b, renew_secret_b,
923 cancel_secret_b, nodeid_b) )
925 def test_leases(self):
926 ss = self.create("test_leases", sizelimit=1000*1000)
928 return ( self.write_enabler("we1"),
929 self.renew_secret("we1-%d" % n),
930 self.cancel_secret("we1-%d" % n) )
931 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
932 write = ss.remote_slot_testv_and_readv_and_writev
933 read = ss.remote_slot_readv
934 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
935 self.failUnlessEqual(rc, (True, {}))
937 # create a random non-numeric file in the bucket directory, to
938 # exercise the code that's supposed to ignore those.
939 bucket_dir = os.path.join(self.workdir("test_leases"),
940 "shares", storage_index_to_dir("si1"))
941 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
942 f.write("you ought to be ignoring me\n")
945 # re-allocate the slots and use the same secrets, that should update
947 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
950 ss.remote_renew_lease("si1", secrets(0)[1])
952 # now allocate them with a bunch of different secrets, to trigger the
953 # extended lease code
954 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
955 write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
956 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
957 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
958 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
961 ss.remote_cancel_lease("si1", secrets(5)[2])
963 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
964 all_leases = s0.debug_get_leases()
965 self.failUnlessEqual(len(all_leases), 5)
967 # and write enough data to expand the container, forcing the server
969 write("si1", secrets(0),
970 {0: ([], [(0,data)], 200), },
973 # read back the leases, make sure they're still intact.
974 self.compare_leases_without_timestamps(all_leases,
975 s0.debug_get_leases())
977 ss.remote_renew_lease("si1", secrets(0)[1])
978 ss.remote_renew_lease("si1", secrets(1)[1])
979 ss.remote_renew_lease("si1", secrets(2)[1])
980 ss.remote_renew_lease("si1", secrets(3)[1])
981 ss.remote_renew_lease("si1", secrets(4)[1])
982 self.compare_leases_without_timestamps(all_leases,
983 s0.debug_get_leases())
984 # get a new copy of the leases, with the current timestamps. Reading
985 # data and failing to renew/cancel leases should leave the timestamps
987 all_leases = s0.debug_get_leases()
988 # renewing with a bogus token should prompt an error message
990 # TODO: examine the exception thus raised, make sure the old nodeid
991 # is present, to provide for share migration
992 self.failUnlessRaises(IndexError,
993 ss.remote_renew_lease, "si1",
995 # same for cancelling
996 self.failUnlessRaises(IndexError,
997 ss.remote_cancel_lease, "si1",
999 self.failUnlessEqual(all_leases, s0.debug_get_leases())
1001 # reading shares should not modify the timestamp
1002 read("si1", [], [(0,200)])
1003 self.failUnlessEqual(all_leases, s0.debug_get_leases())
1005 write("si1", secrets(0),
1006 {0: ([], [(200, "make me bigger")], None)}, [])
1007 self.compare_leases_without_timestamps(all_leases,
1008 s0.debug_get_leases())
1010 write("si1", secrets(0),
1011 {0: ([], [(500, "make me really bigger")], None)}, [])
1012 self.compare_leases_without_timestamps(all_leases,
1013 s0.debug_get_leases())
1015 # now cancel them all
1016 ss.remote_cancel_lease("si1", secrets(0)[2])
1017 ss.remote_cancel_lease("si1", secrets(1)[2])
1018 ss.remote_cancel_lease("si1", secrets(2)[2])
1019 ss.remote_cancel_lease("si1", secrets(3)[2])
1021 # the slot should still be there
1022 remaining_shares = read("si1", [], [(0,10)])
1023 self.failUnlessEqual(len(remaining_shares), 1)
1024 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1026 # cancelling a non-existent lease should raise an IndexError
1027 self.failUnlessRaises(IndexError,
1028 ss.remote_cancel_lease, "si1", "nonsecret")
1030 # and the slot should still be there
1031 remaining_shares = read("si1", [], [(0,10)])
1032 self.failUnlessEqual(len(remaining_shares), 1)
1033 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1035 ss.remote_cancel_lease("si1", secrets(4)[2])
1036 # now the slot should be gone
1037 no_shares = read("si1", [], [(0,10)])
1038 self.failUnlessEqual(no_shares, {})
1040 # cancelling a lease on a non-existent share should raise an IndexError
1041 self.failUnlessRaises(IndexError,
1042 ss.remote_cancel_lease, "si2", "nonsecret")
1045 class Stats(unittest.TestCase):
1048 self.sparent = LoggingServiceParent()
1049 self._lease_secret = itertools.count()
1051 return self.sparent.stopService()
1053 def workdir(self, name):
1054 basedir = os.path.join("storage", "Server", name)
1057 def create(self, name, sizelimit=None):
1058 workdir = self.workdir(name)
1059 ss = StorageServer(workdir, sizelimit)
1060 ss.setServiceParent(self.sparent)
1063 def test_latencies(self):
1064 ss = self.create("test_latencies")
1065 for i in range(10000):
1066 ss.add_latency("allocate", 1.0 * i)
1067 for i in range(1000):
1068 ss.add_latency("renew", 1.0 * i)
1070 ss.add_latency("cancel", 2.0 * i)
1071 ss.add_latency("get", 5.0)
1073 output = ss.get_latencies()
1075 self.failUnlessEqual(sorted(output.keys()),
1076 sorted(["allocate", "renew", "cancel", "get"]))
1077 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1078 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1079 self.failUnless(abs(output["allocate"]["median"] - 9500) < 1)
1080 self.failUnless(abs(output["allocate"]["90_percentile"] - 9900) < 1)
1081 self.failUnless(abs(output["allocate"]["95_percentile"] - 9950) < 1)
1082 self.failUnless(abs(output["allocate"]["99_percentile"] - 9990) < 1)
1083 self.failUnless(abs(output["allocate"]["999_percentile"] - 9999) < 1)
1085 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1086 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1087 self.failUnless(abs(output["renew"]["median"] - 500) < 1)
1088 self.failUnless(abs(output["renew"]["90_percentile"] - 900) < 1)
1089 self.failUnless(abs(output["renew"]["95_percentile"] - 950) < 1)
1090 self.failUnless(abs(output["renew"]["99_percentile"] - 990) < 1)
1091 self.failUnless(abs(output["renew"]["999_percentile"] - 999) < 1)
1093 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1094 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1095 self.failUnless(abs(output["cancel"]["median"] - 10) < 1)
1096 self.failUnless(abs(output["cancel"]["90_percentile"] - 18) < 1)
1097 self.failUnless(abs(output["cancel"]["95_percentile"] - 18) < 1)
1098 self.failUnless(abs(output["cancel"]["99_percentile"] - 18) < 1)
1099 self.failUnless(abs(output["cancel"]["999_percentile"] - 18) < 1)
1101 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1102 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1103 self.failUnless(abs(output["get"]["median"] - 5) < 1)
1104 self.failUnless(abs(output["get"]["90_percentile"] - 5) < 1)
1105 self.failUnless(abs(output["get"]["95_percentile"] - 5) < 1)
1106 self.failUnless(abs(output["get"]["99_percentile"] - 5) < 1)
1107 self.failUnless(abs(output["get"]["999_percentile"] - 5) < 1)