2 from twisted.trial import unittest
4 from twisted.internet import defer
5 import time, os.path, stat
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil
9 from allmydata.storage import BucketWriter, BucketReader, \
10 StorageServer, MutableShareFile, \
11 storage_index_to_dir, DataTooLargeError, LeaseInfo
12 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
14 from allmydata.interfaces import BadWriteEnablerError
15 from allmydata.test.common import LoggingServiceParent
20 def __init__(self, ignore_disconnectors=False):
21 self.ignore = ignore_disconnectors
22 self.disconnectors = {}
23 def notifyOnDisconnect(self, f, *args, **kwargs):
27 self.disconnectors[m] = (f, args, kwargs)
29 def dontNotifyOnDisconnect(self, marker):
32 del self.disconnectors[marker]
34 class FakeStatsProvider:
35 def count(self, name, delta=1):
37 def register_producer(self, producer):
40 class Bucket(unittest.TestCase):
41 def make_workdir(self, name):
42 basedir = os.path.join("storage", "Bucket", name)
43 incoming = os.path.join(basedir, "tmp", "bucket")
44 final = os.path.join(basedir, "bucket")
45 fileutil.make_dirs(basedir)
46 fileutil.make_dirs(os.path.join(basedir, "tmp"))
47 return incoming, final
49 def bucket_writer_closed(self, bw, consumed):
51 def add_latency(self, category, latency):
53 def count(self, name, delta=1):
58 renew_secret = os.urandom(32)
59 cancel_secret = os.urandom(32)
60 expiration_time = time.time() + 5000
61 return LeaseInfo(owner_num, renew_secret, cancel_secret,
62 expiration_time, "\x00" * 20)
64 def test_create(self):
65 incoming, final = self.make_workdir("test_create")
66 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
68 bw.remote_write(0, "a"*25)
69 bw.remote_write(25, "b"*25)
70 bw.remote_write(50, "c"*25)
71 bw.remote_write(75, "d"*7)
74 def test_readwrite(self):
75 incoming, final = self.make_workdir("test_readwrite")
76 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
78 bw.remote_write(0, "a"*25)
79 bw.remote_write(25, "b"*25)
80 bw.remote_write(50, "c"*7) # last block may be short
84 br = BucketReader(self, bw.finalhome)
85 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
86 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
87 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
91 def callRemote(self, methname, *args, **kwargs):
93 meth = getattr(self.target, "remote_" + methname)
94 return meth(*args, **kwargs)
95 return defer.maybeDeferred(_call)
97 class BucketProxy(unittest.TestCase):
98 def make_bucket(self, name, size):
99 basedir = os.path.join("storage", "BucketProxy", name)
100 incoming = os.path.join(basedir, "tmp", "bucket")
101 final = os.path.join(basedir, "bucket")
102 fileutil.make_dirs(basedir)
103 fileutil.make_dirs(os.path.join(basedir, "tmp"))
104 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
110 def make_lease(self):
112 renew_secret = os.urandom(32)
113 cancel_secret = os.urandom(32)
114 expiration_time = time.time() + 5000
115 return LeaseInfo(owner_num, renew_secret, cancel_secret,
116 expiration_time, "\x00" * 20)
118 def bucket_writer_closed(self, bw, consumed):
120 def add_latency(self, category, latency):
122 def count(self, name, delta=1):
125 def test_create(self):
126 bw, rb, sharefname = self.make_bucket("test_create", 500)
127 bp = WriteBucketProxy(rb,
132 uri_extension_size=500, nodeid=None)
133 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
135 def _do_test_readwrite(self, header_size, wbp_class, rbp_class):
136 # Let's pretend each share has 100 bytes of data, and that there are
137 # 4 segments (25 bytes each), and 8 shares total. So the three
138 # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
139 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
140 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
141 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
142 # long. That should make the whole share:
144 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
145 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
147 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
149 plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
151 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
153 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
155 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
157 uri_extension = "s" + "E"*498 + "e"
159 bw, rb, sharefname = self.make_bucket("test_readwrite", sharesize)
165 uri_extension_size=len(uri_extension),
169 d.addCallback(lambda res: bp.put_block(0, "a"*25))
170 d.addCallback(lambda res: bp.put_block(1, "b"*25))
171 d.addCallback(lambda res: bp.put_block(2, "c"*25))
172 d.addCallback(lambda res: bp.put_block(3, "d"*20))
173 d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
174 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
175 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
176 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
177 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
178 d.addCallback(lambda res: bp.close())
180 # now read everything back
181 def _start_reading(res):
182 br = BucketReader(self, sharefname)
185 rbp = rbp_class(rb, peerid="abc")
186 self.failUnless("to peer" in repr(rbp))
187 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
189 d1 = rbp.startIfNecessary()
190 d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
191 d1.addCallback(lambda res: rbp.get_block(0))
192 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
193 d1.addCallback(lambda res: rbp.get_block(1))
194 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
195 d1.addCallback(lambda res: rbp.get_block(2))
196 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
197 d1.addCallback(lambda res: rbp.get_block(3))
198 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
200 d1.addCallback(lambda res: rbp.get_plaintext_hashes())
201 d1.addCallback(lambda res:
202 self.failUnlessEqual(res, plaintext_hashes))
203 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
204 d1.addCallback(lambda res:
205 self.failUnlessEqual(res, crypttext_hashes))
206 d1.addCallback(lambda res: rbp.get_block_hashes())
207 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
208 d1.addCallback(lambda res: rbp.get_share_hashes())
209 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
210 d1.addCallback(lambda res: rbp.get_uri_extension())
211 d1.addCallback(lambda res:
212 self.failUnlessEqual(res, uri_extension))
216 d.addCallback(_start_reading)
220 def test_readwrite_v1(self):
221 return self._do_test_readwrite(0x24, WriteBucketProxy, ReadBucketProxy)
223 def test_readwrite_v2(self):
224 return self._do_test_readwrite(0x44, WriteBucketProxy_v2, ReadBucketProxy)
226 class Server(unittest.TestCase):
229 self.sparent = LoggingServiceParent()
230 self._lease_secret = itertools.count()
232 return self.sparent.stopService()
234 def workdir(self, name):
235 basedir = os.path.join("storage", "Server", name)
238 def create(self, name, sizelimit=None):
239 workdir = self.workdir(name)
240 ss = StorageServer(workdir, sizelimit,
241 stats_provider=FakeStatsProvider())
242 ss.setNodeID("\x00" * 20)
243 ss.setServiceParent(self.sparent)
246 def test_create(self):
247 ss = self.create("test_create")
249 def allocate(self, ss, storage_index, sharenums, size, canary=None):
250 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
251 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
253 canary = FakeCanary()
254 return ss.remote_allocate_buckets(storage_index,
255 renew_secret, cancel_secret,
256 sharenums, size, canary)
258 def test_dont_overfill_dirs(self):
260 This test asserts that if you add a second share whose storage index
261 share lots of leading bits with an extant share (but isn't the exact
262 same storage index), this won't add an entry to the share directory.
264 ss = self.create("test_dont_overfill_dirs")
265 already, writers = self.allocate(ss, "storageindex", [0], 10)
266 for i, wb in writers.items():
267 wb.remote_write(0, "%10d" % i)
269 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
271 children_of_storedir = set(os.listdir(storedir))
273 # Now store another one under another storageindex that has leading
274 # chars the same as the first storageindex.
275 already, writers = self.allocate(ss, "storageindey", [0], 10)
276 for i, wb in writers.items():
277 wb.remote_write(0, "%10d" % i)
279 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
281 new_children_of_storedir = set(os.listdir(storedir))
282 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
284 def test_remove_incoming(self):
285 ss = self.create("test_remove_incoming")
286 already, writers = self.allocate(ss, "vid", range(3), 10)
287 for i,wb in writers.items():
288 wb.remote_write(0, "%10d" % i)
290 incoming_share_dir = wb.incominghome
291 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
292 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
293 incoming_dir = os.path.dirname(incoming_prefix_dir)
294 self.failIf(os.path.exists(incoming_bucket_dir))
295 self.failIf(os.path.exists(incoming_prefix_dir))
296 self.failUnless(os.path.exists(incoming_dir))
298 def test_allocate(self):
299 ss = self.create("test_allocate")
301 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
303 canary = FakeCanary()
304 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
305 self.failUnlessEqual(already, set())
306 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
308 # while the buckets are open, they should not count as readable
309 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
312 for i,wb in writers.items():
313 wb.remote_write(0, "%25d" % i)
315 # aborting a bucket that was already closed is a no-op
318 # now they should be readable
319 b = ss.remote_get_buckets("allocate")
320 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
321 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
323 # now if we ask about writing again, the server should offer those
324 # three buckets as already present. It should offer them even if we
325 # don't ask about those specific ones.
326 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
327 self.failUnlessEqual(already, set([0,1,2]))
328 self.failUnlessEqual(set(writers.keys()), set([3,4]))
330 # while those two buckets are open for writing, the server should
331 # refuse to offer them to uploaders
333 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
334 self.failUnlessEqual(already2, set([0,1,2]))
335 self.failUnlessEqual(set(writers2.keys()), set([5]))
337 # aborting the writes should remove the tempfiles
338 for i,wb in writers2.items():
340 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
341 self.failUnlessEqual(already2, set([0,1,2]))
342 self.failUnlessEqual(set(writers2.keys()), set([5]))
344 for i,wb in writers2.items():
346 for i,wb in writers.items():
349 def test_disconnect(self):
350 # simulate a disconnection
351 ss = self.create("test_disconnect")
352 canary = FakeCanary()
353 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
354 self.failUnlessEqual(already, set())
355 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
356 for (f,args,kwargs) in canary.disconnectors.values():
361 # that ought to delete the incoming shares
362 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
363 self.failUnlessEqual(already, set())
364 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
366 def test_sizelimits(self):
367 ss = self.create("test_sizelimits", 5000)
368 # a newly created and filled share incurs this much overhead, beyond
369 # the size we request.
371 LEASE_SIZE = 4+32+32+4
372 canary = FakeCanary(True)
373 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
374 self.failUnlessEqual(len(writers), 3)
375 # now the StorageServer should have 3000 bytes provisionally
376 # allocated, allowing only 2000 more to be claimed
377 self.failUnlessEqual(len(ss._active_writers), 3)
379 # allocating 1001-byte shares only leaves room for one
380 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
381 self.failUnlessEqual(len(writers2), 1)
382 self.failUnlessEqual(len(ss._active_writers), 4)
384 # we abandon the first set, so their provisional allocation should be
388 self.failUnlessEqual(len(ss._active_writers), 1)
389 # now we have a provisional allocation of 1001 bytes
391 # and we close the second set, so their provisional allocation should
392 # become real, long-term allocation, and grows to include the
394 for bw in writers2.values():
395 bw.remote_write(0, "a"*25)
400 self.failUnlessEqual(len(ss._active_writers), 0)
402 allocated = 1001 + OVERHEAD + LEASE_SIZE
403 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
404 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
405 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
406 self.failUnlessEqual(len(writers3), 39)
407 self.failUnlessEqual(len(ss._active_writers), 39)
411 self.failUnlessEqual(len(ss._active_writers), 0)
412 ss.disownServiceParent()
415 # creating a new StorageServer in the same directory should see the
418 # metadata that goes into the share file is counted upon share close,
419 # as well as at startup. metadata that goes into other files will not
420 # be counted until the next startup, so if we were creating any
421 # extra-file metadata, the allocation would be more than 'allocated'
422 # and this test would need to be changed.
423 ss = self.create("test_sizelimits", 5000)
424 already4,writers4 = self.allocate(ss, "vid4", range(100), 100, canary)
425 self.failUnlessEqual(len(writers4), 39)
426 self.failUnlessEqual(len(ss._active_writers), 39)
429 basedir = self.workdir("test_seek_behavior")
430 fileutil.make_dirs(basedir)
431 filename = os.path.join(basedir, "testfile")
432 f = open(filename, "wb")
435 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
436 # files. mode="a" preserves previous contents but does not allow
437 # seeking-to-create-holes. mode="r+" allows both.
438 f = open(filename, "rb+")
442 filelen = os.stat(filename)[stat.ST_SIZE]
443 self.failUnlessEqual(filelen, 100+3)
444 f2 = open(filename, "rb")
445 self.failUnlessEqual(f2.read(5), "start")
448 def test_leases(self):
449 ss = self.create("test_leases")
450 canary = FakeCanary()
454 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
455 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
456 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
457 sharenums, size, canary)
458 self.failUnlessEqual(len(already), 0)
459 self.failUnlessEqual(len(writers), 5)
460 for wb in writers.values():
463 leases = list(ss.get_leases("si0"))
464 self.failUnlessEqual(len(leases), 1)
465 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
467 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
468 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
469 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
470 sharenums, size, canary)
471 for wb in writers.values():
474 # take out a second lease on si1
475 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
476 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
477 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
478 sharenums, size, canary)
479 self.failUnlessEqual(len(already), 5)
480 self.failUnlessEqual(len(writers), 0)
482 leases = list(ss.get_leases("si1"))
483 self.failUnlessEqual(len(leases), 2)
484 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
486 # check that si0 is readable
487 readers = ss.remote_get_buckets("si0")
488 self.failUnlessEqual(len(readers), 5)
490 # renew the first lease. Only the proper renew_secret should work
491 ss.remote_renew_lease("si0", rs0)
492 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
493 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
495 # check that si0 is still readable
496 readers = ss.remote_get_buckets("si0")
497 self.failUnlessEqual(len(readers), 5)
500 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
501 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
502 ss.remote_cancel_lease("si0", cs0)
504 # si0 should now be gone
505 readers = ss.remote_get_buckets("si0")
506 self.failUnlessEqual(len(readers), 0)
507 # and the renew should no longer work
508 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
511 # cancel the first lease on si1, leaving the second in place
512 ss.remote_cancel_lease("si1", cs1)
513 readers = ss.remote_get_buckets("si1")
514 self.failUnlessEqual(len(readers), 5)
515 # the corresponding renew should no longer work
516 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
518 leases = list(ss.get_leases("si1"))
519 self.failUnlessEqual(len(leases), 1)
520 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2]))
522 ss.remote_renew_lease("si1", rs2)
523 # cancelling the second should make it go away
524 ss.remote_cancel_lease("si1", cs2)
525 readers = ss.remote_get_buckets("si1")
526 self.failUnlessEqual(len(readers), 0)
527 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
528 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
530 leases = list(ss.get_leases("si1"))
531 self.failUnlessEqual(len(leases), 0)
534 # test overlapping uploads
535 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
536 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
537 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
538 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
539 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
540 sharenums, size, canary)
541 self.failUnlessEqual(len(already), 0)
542 self.failUnlessEqual(len(writers), 5)
543 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
544 sharenums, size, canary)
545 self.failUnlessEqual(len(already2), 0)
546 self.failUnlessEqual(len(writers2), 0)
547 for wb in writers.values():
550 leases = list(ss.get_leases("si3"))
551 self.failUnlessEqual(len(leases), 1)
553 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
554 sharenums, size, canary)
555 self.failUnlessEqual(len(already3), 5)
556 self.failUnlessEqual(len(writers3), 0)
558 leases = list(ss.get_leases("si3"))
559 self.failUnlessEqual(len(leases), 2)
561 def test_readonly(self):
562 workdir = self.workdir("test_readonly")
563 ss = StorageServer(workdir, readonly_storage=True)
564 ss.setNodeID("\x00" * 20)
565 ss.setServiceParent(self.sparent)
567 canary = FakeCanary()
568 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
569 self.failUnlessEqual(already, set())
570 self.failUnlessEqual(writers, {})
572 def test_discard(self):
573 # discard is really only used for other tests, but we test it anyways
574 workdir = self.workdir("test_discard")
575 ss = StorageServer(workdir, discard_storage=True)
576 ss.setNodeID("\x00" * 20)
577 ss.setServiceParent(self.sparent)
579 canary = FakeCanary()
580 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
581 self.failUnlessEqual(already, set())
582 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
583 for i,wb in writers.items():
584 wb.remote_write(0, "%25d" % i)
586 # since we discard the data, the shares should be present but sparse.
587 # Since we write with some seeks, the data we read back will be all
589 b = ss.remote_get_buckets("vid")
590 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
591 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
595 class MutableServer(unittest.TestCase):
598 self.sparent = LoggingServiceParent()
599 self._lease_secret = itertools.count()
601 return self.sparent.stopService()
603 def workdir(self, name):
604 basedir = os.path.join("storage", "MutableServer", name)
607 def create(self, name, sizelimit=None):
608 workdir = self.workdir(name)
609 ss = StorageServer(workdir, sizelimit)
610 ss.setServiceParent(self.sparent)
611 ss.setNodeID("\x00" * 20)
614 def test_create(self):
615 ss = self.create("test_create")
617 def write_enabler(self, we_tag):
618 return hashutil.tagged_hash("we_blah", we_tag)
620 def renew_secret(self, tag):
621 return hashutil.tagged_hash("renew_blah", str(tag))
623 def cancel_secret(self, tag):
624 return hashutil.tagged_hash("cancel_blah", str(tag))
626 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
627 write_enabler = self.write_enabler(we_tag)
628 renew_secret = self.renew_secret(lease_tag)
629 cancel_secret = self.cancel_secret(lease_tag)
630 rstaraw = ss.remote_slot_testv_and_readv_and_writev
631 testandwritev = dict( [ (shnum, ([], [], None) )
632 for shnum in sharenums ] )
634 rc = rstaraw(storage_index,
635 (write_enabler, renew_secret, cancel_secret),
638 (did_write, readv_data) = rc
639 self.failUnless(did_write)
640 self.failUnless(isinstance(readv_data, dict))
641 self.failUnlessEqual(len(readv_data), 0)
643 def test_container_size(self):
644 ss = self.create("test_container_size")
645 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
647 rstaraw = ss.remote_slot_testv_and_readv_and_writev
648 secrets = ( self.write_enabler("we1"),
649 self.renew_secret("we1"),
650 self.cancel_secret("we1") )
651 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
652 answer = rstaraw("si1", secrets,
653 {0: ([], [(0,data)], len(data)+12)},
655 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
657 # trying to make the container too large will raise an exception
658 TOOBIG = MutableShareFile.MAX_SIZE + 10
659 self.failUnlessRaises(DataTooLargeError,
660 rstaraw, "si1", secrets,
661 {0: ([], [(0,data)], TOOBIG)},
664 # it should be possible to make the container smaller, although at
665 # the moment this doesn't actually affect the share
666 answer = rstaraw("si1", secrets,
667 {0: ([], [(0,data)], len(data)+8)},
669 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
671 def test_allocate(self):
672 ss = self.create("test_allocate")
673 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
676 read = ss.remote_slot_readv
677 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
679 self.failUnlessEqual(read("si1", [], [(0, 10)]),
680 {0: [""], 1: [""], 2: [""]})
681 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
685 secrets = ( self.write_enabler("we1"),
686 self.renew_secret("we1"),
687 self.cancel_secret("we1") )
688 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
689 write = ss.remote_slot_testv_and_readv_and_writev
690 answer = write("si1", secrets,
691 {0: ([], [(0,data)], None)},
693 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
695 self.failUnlessEqual(read("si1", [0], [(0,20)]),
696 {0: ["00000000001111111111"]})
697 self.failUnlessEqual(read("si1", [0], [(95,10)]),
699 #self.failUnlessEqual(s0.remote_get_length(), 100)
701 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
702 f = self.failUnlessRaises(BadWriteEnablerError,
703 write, "si1", bad_secrets,
705 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
707 # this testv should fail
708 answer = write("si1", secrets,
709 {0: ([(0, 12, "eq", "444444444444"),
710 (20, 5, "eq", "22222"),
717 self.failUnlessEqual(answer, (False,
718 {0: ["000000000011", "22222"],
722 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
725 answer = write("si1", secrets,
726 {0: ([(10, 5, "lt", "11111"),
733 self.failUnlessEqual(answer, (False,
738 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
741 def test_operators(self):
742 # test operators, the data we're comparing is '11111' in all cases.
743 # test both fail+pass, reset data after each one.
744 ss = self.create("test_operators")
746 secrets = ( self.write_enabler("we1"),
747 self.renew_secret("we1"),
748 self.cancel_secret("we1") )
749 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
750 write = ss.remote_slot_testv_and_readv_and_writev
751 read = ss.remote_slot_readv
754 write("si1", secrets,
755 {0: ([], [(0,data)], None)},
761 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
766 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
767 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
768 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
771 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
776 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
777 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
780 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
785 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
786 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
790 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
795 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
796 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
799 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
804 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
805 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
808 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
813 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
814 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
818 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
823 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
824 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
827 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
832 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
833 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
837 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
842 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
843 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
846 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
851 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
852 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
856 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
861 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
862 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
865 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
870 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
871 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
874 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
879 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
880 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
884 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
889 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
890 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
893 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
898 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
899 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
902 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
907 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
908 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
911 # finally, test some operators against empty shares
912 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
917 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
918 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
921 def test_readv(self):
922 ss = self.create("test_readv")
923 secrets = ( self.write_enabler("we1"),
924 self.renew_secret("we1"),
925 self.cancel_secret("we1") )
926 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
927 write = ss.remote_slot_testv_and_readv_and_writev
928 read = ss.remote_slot_readv
929 data = [("%d" % i) * 100 for i in range(3)]
930 rc = write("si1", secrets,
931 {0: ([], [(0,data[0])], None),
932 1: ([], [(0,data[1])], None),
933 2: ([], [(0,data[2])], None),
935 self.failUnlessEqual(rc, (True, {}))
937 answer = read("si1", [], [(0, 10)])
938 self.failUnlessEqual(answer, {0: ["0"*10],
942 def compare_leases_without_timestamps(self, leases_a, leases_b):
943 self.failUnlessEqual(len(leases_a), len(leases_b))
944 for i in range(len(leases_a)):
945 num_a, a = leases_a[i]
946 num_b, b = leases_b[i]
947 self.failUnlessEqual(num_a, num_b)
948 self.failUnlessEqual(a.owner_num, b.owner_num)
949 self.failUnlessEqual(a.renew_secret, b.renew_secret)
950 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
951 self.failUnlessEqual(a.nodeid, b.nodeid)
953 def compare_leases(self, leases_a, leases_b):
954 self.failUnlessEqual(len(leases_a), len(leases_b))
955 for i in range(len(leases_a)):
956 num_a, a = leases_a[i]
957 num_b, b = leases_b[i]
958 self.failUnlessEqual(num_a, num_b)
959 self.failUnlessEqual(a.owner_num, b.owner_num)
960 self.failUnlessEqual(a.renew_secret, b.renew_secret)
961 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
962 self.failUnlessEqual(a.nodeid, b.nodeid)
963 self.failUnlessEqual(a.expiration_time, b.expiration_time)
965 def test_leases(self):
966 ss = self.create("test_leases", sizelimit=1000*1000)
968 return ( self.write_enabler("we1"),
969 self.renew_secret("we1-%d" % n),
970 self.cancel_secret("we1-%d" % n) )
971 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
972 write = ss.remote_slot_testv_and_readv_and_writev
973 read = ss.remote_slot_readv
974 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
975 self.failUnlessEqual(rc, (True, {}))
977 # create a random non-numeric file in the bucket directory, to
978 # exercise the code that's supposed to ignore those.
979 bucket_dir = os.path.join(self.workdir("test_leases"),
980 "shares", storage_index_to_dir("si1"))
981 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
982 f.write("you ought to be ignoring me\n")
985 # re-allocate the slots and use the same secrets, that should update
987 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
990 ss.remote_renew_lease("si1", secrets(0)[1])
992 # now allocate them with a bunch of different secrets, to trigger the
993 # extended lease code
994 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
995 write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
996 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
997 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
998 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1000 # cancel one of them
1001 ss.remote_cancel_lease("si1", secrets(5)[2])
1003 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1004 all_leases = s0.debug_get_leases()
1005 self.failUnlessEqual(len(all_leases), 5)
1007 # and write enough data to expand the container, forcing the server
1008 # to move the leases
1009 write("si1", secrets(0),
1010 {0: ([], [(0,data)], 200), },
1013 # read back the leases, make sure they're still intact.
1014 self.compare_leases_without_timestamps(all_leases,
1015 s0.debug_get_leases())
1017 ss.remote_renew_lease("si1", secrets(0)[1])
1018 ss.remote_renew_lease("si1", secrets(1)[1])
1019 ss.remote_renew_lease("si1", secrets(2)[1])
1020 ss.remote_renew_lease("si1", secrets(3)[1])
1021 ss.remote_renew_lease("si1", secrets(4)[1])
1022 self.compare_leases_without_timestamps(all_leases,
1023 s0.debug_get_leases())
1024 # get a new copy of the leases, with the current timestamps. Reading
1025 # data and failing to renew/cancel leases should leave the timestamps
1027 all_leases = s0.debug_get_leases()
1028 # renewing with a bogus token should prompt an error message
1030 # examine the exception thus raised, make sure the old nodeid is
1031 # present, to provide for share migration
1032 e = self.failUnlessRaises(IndexError,
1033 ss.remote_renew_lease, "si1",
1036 self.failUnless("Unable to renew non-existent lease" in e_s)
1037 self.failUnless("I have leases accepted by nodeids:" in e_s)
1038 self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1040 # same for cancelling
1041 self.failUnlessRaises(IndexError,
1042 ss.remote_cancel_lease, "si1",
1044 self.compare_leases(all_leases, s0.debug_get_leases())
1046 # reading shares should not modify the timestamp
1047 read("si1", [], [(0,200)])
1048 self.compare_leases(all_leases, s0.debug_get_leases())
1050 write("si1", secrets(0),
1051 {0: ([], [(200, "make me bigger")], None)}, [])
1052 self.compare_leases_without_timestamps(all_leases,
1053 s0.debug_get_leases())
1055 write("si1", secrets(0),
1056 {0: ([], [(500, "make me really bigger")], None)}, [])
1057 self.compare_leases_without_timestamps(all_leases,
1058 s0.debug_get_leases())
1060 # now cancel them all
1061 ss.remote_cancel_lease("si1", secrets(0)[2])
1062 ss.remote_cancel_lease("si1", secrets(1)[2])
1063 ss.remote_cancel_lease("si1", secrets(2)[2])
1064 ss.remote_cancel_lease("si1", secrets(3)[2])
1066 # the slot should still be there
1067 remaining_shares = read("si1", [], [(0,10)])
1068 self.failUnlessEqual(len(remaining_shares), 1)
1069 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1071 # cancelling a non-existent lease should raise an IndexError
1072 self.failUnlessRaises(IndexError,
1073 ss.remote_cancel_lease, "si1", "nonsecret")
1075 # and the slot should still be there
1076 remaining_shares = read("si1", [], [(0,10)])
1077 self.failUnlessEqual(len(remaining_shares), 1)
1078 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1080 ss.remote_cancel_lease("si1", secrets(4)[2])
1081 # now the slot should be gone
1082 no_shares = read("si1", [], [(0,10)])
1083 self.failUnlessEqual(no_shares, {})
1085 # cancelling a lease on a non-existent share should raise an IndexError
1086 self.failUnlessRaises(IndexError,
1087 ss.remote_cancel_lease, "si2", "nonsecret")
1090 class Stats(unittest.TestCase):
1093 self.sparent = LoggingServiceParent()
1094 self._lease_secret = itertools.count()
1096 return self.sparent.stopService()
1098 def workdir(self, name):
1099 basedir = os.path.join("storage", "Server", name)
1102 def create(self, name, sizelimit=None):
1103 workdir = self.workdir(name)
1104 ss = StorageServer(workdir, sizelimit)
1105 ss.setNodeID("\x00" * 20)
1106 ss.setServiceParent(self.sparent)
1109 def test_latencies(self):
1110 ss = self.create("test_latencies")
1111 for i in range(10000):
1112 ss.add_latency("allocate", 1.0 * i)
1113 for i in range(1000):
1114 ss.add_latency("renew", 1.0 * i)
1116 ss.add_latency("cancel", 2.0 * i)
1117 ss.add_latency("get", 5.0)
1119 output = ss.get_latencies()
1121 self.failUnlessEqual(sorted(output.keys()),
1122 sorted(["allocate", "renew", "cancel", "get"]))
1123 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1124 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1125 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1126 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1127 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1128 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1129 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1130 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1131 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1133 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1134 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1135 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1)
1136 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1137 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1138 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1139 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1140 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1141 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1143 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1144 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1145 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1)
1146 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1)
1147 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1148 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1149 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1150 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1151 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1153 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1154 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1155 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1156 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1157 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1158 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1159 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1160 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1161 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)