1 from twisted.trial import unittest
3 from twisted.internet import defer
4 import time, os.path, stat, re
6 from allmydata import interfaces
7 from allmydata.util import fileutil, hashutil, base32
8 from allmydata.storage.server import StorageServer, storage_index_to_dir
9 from allmydata.storage.mutable import MutableShareFile
10 from allmydata.storage.immutable import BucketWriter, BucketReader
11 from allmydata.storage.common import DataTooLargeError
12 from allmydata.storage.lease import LeaseInfo
13 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
15 from allmydata.interfaces import BadWriteEnablerError
16 from allmydata.test.common import LoggingServiceParent
17 from allmydata.web.storage import StorageStatus, abbreviate_if_known, \
23 def __init__(self, ignore_disconnectors=False):
24 self.ignore = ignore_disconnectors
25 self.disconnectors = {}
26 def notifyOnDisconnect(self, f, *args, **kwargs):
30 self.disconnectors[m] = (f, args, kwargs)
32 def dontNotifyOnDisconnect(self, marker):
35 del self.disconnectors[marker]
37 class FakeStatsProvider:
38 def count(self, name, delta=1):
40 def register_producer(self, producer):
43 class Bucket(unittest.TestCase):
44 def make_workdir(self, name):
45 basedir = os.path.join("storage", "Bucket", name)
46 incoming = os.path.join(basedir, "tmp", "bucket")
47 final = os.path.join(basedir, "bucket")
48 fileutil.make_dirs(basedir)
49 fileutil.make_dirs(os.path.join(basedir, "tmp"))
50 return incoming, final
52 def bucket_writer_closed(self, bw, consumed):
54 def add_latency(self, category, latency):
56 def count(self, name, delta=1):
61 renew_secret = os.urandom(32)
62 cancel_secret = os.urandom(32)
63 expiration_time = time.time() + 5000
64 return LeaseInfo(owner_num, renew_secret, cancel_secret,
65 expiration_time, "\x00" * 20)
67 def test_create(self):
68 incoming, final = self.make_workdir("test_create")
69 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
71 bw.remote_write(0, "a"*25)
72 bw.remote_write(25, "b"*25)
73 bw.remote_write(50, "c"*25)
74 bw.remote_write(75, "d"*7)
77 def test_readwrite(self):
78 incoming, final = self.make_workdir("test_readwrite")
79 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
81 bw.remote_write(0, "a"*25)
82 bw.remote_write(25, "b"*25)
83 bw.remote_write(50, "c"*7) # last block may be short
87 br = BucketReader(self, bw.finalhome)
88 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
89 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
90 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
94 def callRemote(self, methname, *args, **kwargs):
96 meth = getattr(self.target, "remote_" + methname)
97 return meth(*args, **kwargs)
98 return defer.maybeDeferred(_call)
100 class BucketProxy(unittest.TestCase):
101 def make_bucket(self, name, size):
102 basedir = os.path.join("storage", "BucketProxy", name)
103 incoming = os.path.join(basedir, "tmp", "bucket")
104 final = os.path.join(basedir, "bucket")
105 fileutil.make_dirs(basedir)
106 fileutil.make_dirs(os.path.join(basedir, "tmp"))
107 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
113 def make_lease(self):
115 renew_secret = os.urandom(32)
116 cancel_secret = os.urandom(32)
117 expiration_time = time.time() + 5000
118 return LeaseInfo(owner_num, renew_secret, cancel_secret,
119 expiration_time, "\x00" * 20)
121 def bucket_writer_closed(self, bw, consumed):
123 def add_latency(self, category, latency):
125 def count(self, name, delta=1):
128 def test_create(self):
129 bw, rb, sharefname = self.make_bucket("test_create", 500)
130 bp = WriteBucketProxy(rb,
135 uri_extension_size_max=500, nodeid=None)
136 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
138 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
139 # Let's pretend each share has 100 bytes of data, and that there are
140 # 4 segments (25 bytes each), and 8 shares total. So the two
141 # per-segment merkle trees (crypttext_hash_tree,
142 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
143 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
144 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
145 # long. That should make the whole share:
147 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
148 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
150 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
152 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
154 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
156 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
158 uri_extension = "s" + "E"*498 + "e"
160 bw, rb, sharefname = self.make_bucket(name, sharesize)
166 uri_extension_size_max=len(uri_extension),
170 d.addCallback(lambda res: bp.put_block(0, "a"*25))
171 d.addCallback(lambda res: bp.put_block(1, "b"*25))
172 d.addCallback(lambda res: bp.put_block(2, "c"*25))
173 d.addCallback(lambda res: bp.put_block(3, "d"*20))
174 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
175 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
176 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
177 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
178 d.addCallback(lambda res: bp.close())
180 # now read everything back
181 def _start_reading(res):
182 br = BucketReader(self, sharefname)
185 rbp = rbp_class(rb, peerid="abc", storage_index="")
186 self.failUnless("to peer" in repr(rbp))
187 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
189 d1 = rbp.get_block_data(0, 25, 25)
190 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
191 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
192 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
193 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
194 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
195 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
196 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
198 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
199 d1.addCallback(lambda res:
200 self.failUnlessEqual(res, crypttext_hashes))
201 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
202 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
203 d1.addCallback(lambda res: rbp.get_share_hashes())
204 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
205 d1.addCallback(lambda res: rbp.get_uri_extension())
206 d1.addCallback(lambda res:
207 self.failUnlessEqual(res, uri_extension))
211 d.addCallback(_start_reading)
215 def test_readwrite_v1(self):
216 return self._do_test_readwrite("test_readwrite_v1",
217 0x24, WriteBucketProxy, ReadBucketProxy)
219 def test_readwrite_v2(self):
220 return self._do_test_readwrite("test_readwrite_v2",
221 0x44, WriteBucketProxy_v2, ReadBucketProxy)
223 class FakeDiskStorageServer(StorageServer):
224 def stat_disk(self, d):
225 return self.DISKAVAIL
227 class Server(unittest.TestCase):
230 self.sparent = LoggingServiceParent()
231 self._lease_secret = itertools.count()
233 return self.sparent.stopService()
235 def workdir(self, name):
236 basedir = os.path.join("storage", "Server", name)
239 def create(self, name, reserved_space=0, klass=StorageServer):
240 workdir = self.workdir(name)
241 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
242 stats_provider=FakeStatsProvider())
243 ss.setServiceParent(self.sparent)
246 def test_create(self):
247 ss = self.create("test_create")
249 def allocate(self, ss, storage_index, sharenums, size, canary=None):
250 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
251 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
253 canary = FakeCanary()
254 return ss.remote_allocate_buckets(storage_index,
255 renew_secret, cancel_secret,
256 sharenums, size, canary)
258 def test_large_share(self):
259 ss = self.create("test_large_share")
261 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
262 self.failUnlessEqual(already, set())
263 self.failUnlessEqual(set(writers.keys()), set([0]))
265 shnum, bucket = writers.items()[0]
266 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
267 bucket.remote_write(2**32, "ab")
268 bucket.remote_close()
270 readers = ss.remote_get_buckets("allocate")
271 reader = readers[shnum]
272 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
273 test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
275 def test_dont_overfill_dirs(self):
277 This test asserts that if you add a second share whose storage index
278 share lots of leading bits with an extant share (but isn't the exact
279 same storage index), this won't add an entry to the share directory.
281 ss = self.create("test_dont_overfill_dirs")
282 already, writers = self.allocate(ss, "storageindex", [0], 10)
283 for i, wb in writers.items():
284 wb.remote_write(0, "%10d" % i)
286 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
288 children_of_storedir = set(os.listdir(storedir))
290 # Now store another one under another storageindex that has leading
291 # chars the same as the first storageindex.
292 already, writers = self.allocate(ss, "storageindey", [0], 10)
293 for i, wb in writers.items():
294 wb.remote_write(0, "%10d" % i)
296 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
298 new_children_of_storedir = set(os.listdir(storedir))
299 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
301 def test_remove_incoming(self):
302 ss = self.create("test_remove_incoming")
303 already, writers = self.allocate(ss, "vid", range(3), 10)
304 for i,wb in writers.items():
305 wb.remote_write(0, "%10d" % i)
307 incoming_share_dir = wb.incominghome
308 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
309 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
310 incoming_dir = os.path.dirname(incoming_prefix_dir)
311 self.failIf(os.path.exists(incoming_bucket_dir))
312 self.failIf(os.path.exists(incoming_prefix_dir))
313 self.failUnless(os.path.exists(incoming_dir))
315 def test_allocate(self):
316 ss = self.create("test_allocate")
318 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
320 canary = FakeCanary()
321 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
322 self.failUnlessEqual(already, set())
323 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
325 # while the buckets are open, they should not count as readable
326 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
329 for i,wb in writers.items():
330 wb.remote_write(0, "%25d" % i)
332 # aborting a bucket that was already closed is a no-op
335 # now they should be readable
336 b = ss.remote_get_buckets("allocate")
337 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
338 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
340 # now if we ask about writing again, the server should offer those
341 # three buckets as already present. It should offer them even if we
342 # don't ask about those specific ones.
343 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
344 self.failUnlessEqual(already, set([0,1,2]))
345 self.failUnlessEqual(set(writers.keys()), set([3,4]))
347 # while those two buckets are open for writing, the server should
348 # refuse to offer them to uploaders
350 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
351 self.failUnlessEqual(already2, set([0,1,2]))
352 self.failUnlessEqual(set(writers2.keys()), set([5]))
354 # aborting the writes should remove the tempfiles
355 for i,wb in writers2.items():
357 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
358 self.failUnlessEqual(already2, set([0,1,2]))
359 self.failUnlessEqual(set(writers2.keys()), set([5]))
361 for i,wb in writers2.items():
363 for i,wb in writers.items():
366 def test_disconnect(self):
367 # simulate a disconnection
368 ss = self.create("test_disconnect")
369 canary = FakeCanary()
370 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
371 self.failUnlessEqual(already, set())
372 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
373 for (f,args,kwargs) in canary.disconnectors.values():
378 # that ought to delete the incoming shares
379 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
380 self.failUnlessEqual(already, set())
381 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
383 def test_reserved_space(self):
384 ss = self.create("test_reserved_space", reserved_space=10000,
385 klass=FakeDiskStorageServer)
386 # the FakeDiskStorageServer doesn't do real statvfs() calls
388 # 15k available, 10k reserved, leaves 5k for shares
390 # a newly created and filled share incurs this much overhead, beyond
391 # the size we request.
393 LEASE_SIZE = 4+32+32+4
394 canary = FakeCanary(True)
395 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
396 self.failUnlessEqual(len(writers), 3)
397 # now the StorageServer should have 3000 bytes provisionally
398 # allocated, allowing only 2000 more to be claimed
399 self.failUnlessEqual(len(ss._active_writers), 3)
401 # allocating 1001-byte shares only leaves room for one
402 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
403 self.failUnlessEqual(len(writers2), 1)
404 self.failUnlessEqual(len(ss._active_writers), 4)
406 # we abandon the first set, so their provisional allocation should be
410 self.failUnlessEqual(len(ss._active_writers), 1)
411 # now we have a provisional allocation of 1001 bytes
413 # and we close the second set, so their provisional allocation should
414 # become real, long-term allocation, and grows to include the
416 for bw in writers2.values():
417 bw.remote_write(0, "a"*25)
422 self.failUnlessEqual(len(ss._active_writers), 0)
424 allocated = 1001 + OVERHEAD + LEASE_SIZE
426 # we have to manually increase DISKAVAIL, since we're not doing real
428 ss.DISKAVAIL -= allocated
430 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
431 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
432 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
433 self.failUnlessEqual(len(writers3), 39)
434 self.failUnlessEqual(len(ss._active_writers), 39)
438 self.failUnlessEqual(len(ss._active_writers), 0)
439 ss.disownServiceParent()
443 basedir = self.workdir("test_seek_behavior")
444 fileutil.make_dirs(basedir)
445 filename = os.path.join(basedir, "testfile")
446 f = open(filename, "wb")
449 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
450 # files. mode="a" preserves previous contents but does not allow
451 # seeking-to-create-holes. mode="r+" allows both.
452 f = open(filename, "rb+")
456 filelen = os.stat(filename)[stat.ST_SIZE]
457 self.failUnlessEqual(filelen, 100+3)
458 f2 = open(filename, "rb")
459 self.failUnlessEqual(f2.read(5), "start")
462 def test_leases(self):
463 ss = self.create("test_leases")
464 canary = FakeCanary()
468 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
469 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
470 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
471 sharenums, size, canary)
472 self.failUnlessEqual(len(already), 0)
473 self.failUnlessEqual(len(writers), 5)
474 for wb in writers.values():
477 leases = list(ss.get_leases("si0"))
478 self.failUnlessEqual(len(leases), 1)
479 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
481 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
482 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
483 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
484 sharenums, size, canary)
485 for wb in writers.values():
488 # take out a second lease on si1
489 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
490 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
491 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
492 sharenums, size, canary)
493 self.failUnlessEqual(len(already), 5)
494 self.failUnlessEqual(len(writers), 0)
496 leases = list(ss.get_leases("si1"))
497 self.failUnlessEqual(len(leases), 2)
498 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
500 # and a third lease, using add-lease
501 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
502 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
503 ss.remote_add_lease("si1", rs2a, cs2a)
504 leases = list(ss.get_leases("si1"))
505 self.failUnlessEqual(len(leases), 3)
506 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
508 # add-lease on a missing storage index is silently ignored
509 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
511 # check that si0 is readable
512 readers = ss.remote_get_buckets("si0")
513 self.failUnlessEqual(len(readers), 5)
515 # renew the first lease. Only the proper renew_secret should work
516 ss.remote_renew_lease("si0", rs0)
517 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
518 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
520 # check that si0 is still readable
521 readers = ss.remote_get_buckets("si0")
522 self.failUnlessEqual(len(readers), 5)
525 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
526 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
527 ss.remote_cancel_lease("si0", cs0)
529 # si0 should now be gone
530 readers = ss.remote_get_buckets("si0")
531 self.failUnlessEqual(len(readers), 0)
532 # and the renew should no longer work
533 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
536 # cancel the first lease on si1, leaving the second and third in place
537 ss.remote_cancel_lease("si1", cs1)
538 readers = ss.remote_get_buckets("si1")
539 self.failUnlessEqual(len(readers), 5)
540 # the corresponding renew should no longer work
541 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
543 leases = list(ss.get_leases("si1"))
544 self.failUnlessEqual(len(leases), 2)
545 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
547 ss.remote_renew_lease("si1", rs2)
548 # cancelling the second and third should make it go away
549 ss.remote_cancel_lease("si1", cs2)
550 ss.remote_cancel_lease("si1", cs2a)
551 readers = ss.remote_get_buckets("si1")
552 self.failUnlessEqual(len(readers), 0)
553 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
554 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
555 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
557 leases = list(ss.get_leases("si1"))
558 self.failUnlessEqual(len(leases), 0)
561 # test overlapping uploads
562 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
563 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
564 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
565 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
566 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
567 sharenums, size, canary)
568 self.failUnlessEqual(len(already), 0)
569 self.failUnlessEqual(len(writers), 5)
570 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
571 sharenums, size, canary)
572 self.failUnlessEqual(len(already2), 0)
573 self.failUnlessEqual(len(writers2), 0)
574 for wb in writers.values():
577 leases = list(ss.get_leases("si3"))
578 self.failUnlessEqual(len(leases), 1)
580 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
581 sharenums, size, canary)
582 self.failUnlessEqual(len(already3), 5)
583 self.failUnlessEqual(len(writers3), 0)
585 leases = list(ss.get_leases("si3"))
586 self.failUnlessEqual(len(leases), 2)
588 def test_readonly(self):
589 workdir = self.workdir("test_readonly")
590 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
591 ss.setServiceParent(self.sparent)
593 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
594 self.failUnlessEqual(already, set())
595 self.failUnlessEqual(writers, {})
597 stats = ss.get_stats()
598 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
600 if "storage_server.disk_avail" in stats:
601 # windows does not have os.statvfs, so it doesn't give us disk
602 # stats. But if there are stats, readonly_storage means
604 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
606 def test_discard(self):
607 # discard is really only used for other tests, but we test it anyways
608 workdir = self.workdir("test_discard")
609 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
610 ss.setServiceParent(self.sparent)
612 canary = FakeCanary()
613 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
614 self.failUnlessEqual(already, set())
615 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
616 for i,wb in writers.items():
617 wb.remote_write(0, "%25d" % i)
619 # since we discard the data, the shares should be present but sparse.
620 # Since we write with some seeks, the data we read back will be all
622 b = ss.remote_get_buckets("vid")
623 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
624 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
626 def test_advise_corruption(self):
627 workdir = self.workdir("test_advise_corruption")
628 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
629 ss.setServiceParent(self.sparent)
631 si0_s = base32.b2a("si0")
632 ss.remote_advise_corrupt_share("immutable", "si0", 0,
633 "This share smells funny.\n")
634 reportdir = os.path.join(workdir, "corruption-advisories")
635 reports = os.listdir(reportdir)
636 self.failUnlessEqual(len(reports), 1)
637 report_si0 = reports[0]
638 self.failUnless(si0_s in report_si0, report_si0)
639 f = open(os.path.join(reportdir, report_si0), "r")
642 self.failUnless("type: immutable" in report)
643 self.failUnless(("storage_index: %s" % si0_s) in report)
644 self.failUnless("share_number: 0" in report)
645 self.failUnless("This share smells funny." in report)
647 # test the RIBucketWriter version too
648 si1_s = base32.b2a("si1")
649 already,writers = self.allocate(ss, "si1", [1], 75)
650 self.failUnlessEqual(already, set())
651 self.failUnlessEqual(set(writers.keys()), set([1]))
652 writers[1].remote_write(0, "data")
653 writers[1].remote_close()
655 b = ss.remote_get_buckets("si1")
656 self.failUnlessEqual(set(b.keys()), set([1]))
657 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
659 reports = os.listdir(reportdir)
660 self.failUnlessEqual(len(reports), 2)
661 report_si1 = [r for r in reports if si1_s in r][0]
662 f = open(os.path.join(reportdir, report_si1), "r")
665 self.failUnless("type: immutable" in report)
666 self.failUnless(("storage_index: %s" % si1_s) in report)
667 self.failUnless("share_number: 1" in report)
668 self.failUnless("This share tastes like dust." in report)
672 class MutableServer(unittest.TestCase):
675 self.sparent = LoggingServiceParent()
676 self._lease_secret = itertools.count()
678 return self.sparent.stopService()
680 def workdir(self, name):
681 basedir = os.path.join("storage", "MutableServer", name)
684 def create(self, name):
685 workdir = self.workdir(name)
686 ss = StorageServer(workdir, "\x00" * 20)
687 ss.setServiceParent(self.sparent)
690 def test_create(self):
691 ss = self.create("test_create")
693 def write_enabler(self, we_tag):
694 return hashutil.tagged_hash("we_blah", we_tag)
696 def renew_secret(self, tag):
697 return hashutil.tagged_hash("renew_blah", str(tag))
699 def cancel_secret(self, tag):
700 return hashutil.tagged_hash("cancel_blah", str(tag))
702 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
703 write_enabler = self.write_enabler(we_tag)
704 renew_secret = self.renew_secret(lease_tag)
705 cancel_secret = self.cancel_secret(lease_tag)
706 rstaraw = ss.remote_slot_testv_and_readv_and_writev
707 testandwritev = dict( [ (shnum, ([], [], None) )
708 for shnum in sharenums ] )
710 rc = rstaraw(storage_index,
711 (write_enabler, renew_secret, cancel_secret),
714 (did_write, readv_data) = rc
715 self.failUnless(did_write)
716 self.failUnless(isinstance(readv_data, dict))
717 self.failUnlessEqual(len(readv_data), 0)
719 def test_container_size(self):
720 ss = self.create("test_container_size")
721 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
723 rstaraw = ss.remote_slot_testv_and_readv_and_writev
724 secrets = ( self.write_enabler("we1"),
725 self.renew_secret("we1"),
726 self.cancel_secret("we1") )
727 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
728 answer = rstaraw("si1", secrets,
729 {0: ([], [(0,data)], len(data)+12)},
731 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
733 # trying to make the container too large will raise an exception
734 TOOBIG = MutableShareFile.MAX_SIZE + 10
735 self.failUnlessRaises(DataTooLargeError,
736 rstaraw, "si1", secrets,
737 {0: ([], [(0,data)], TOOBIG)},
740 # it should be possible to make the container smaller, although at
741 # the moment this doesn't actually affect the share
742 answer = rstaraw("si1", secrets,
743 {0: ([], [(0,data)], len(data)+8)},
745 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
747 def test_allocate(self):
748 ss = self.create("test_allocate")
749 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
752 read = ss.remote_slot_readv
753 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
755 self.failUnlessEqual(read("si1", [], [(0, 10)]),
756 {0: [""], 1: [""], 2: [""]})
757 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
761 secrets = ( self.write_enabler("we1"),
762 self.renew_secret("we1"),
763 self.cancel_secret("we1") )
764 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
765 write = ss.remote_slot_testv_and_readv_and_writev
766 answer = write("si1", secrets,
767 {0: ([], [(0,data)], None)},
769 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
771 self.failUnlessEqual(read("si1", [0], [(0,20)]),
772 {0: ["00000000001111111111"]})
773 self.failUnlessEqual(read("si1", [0], [(95,10)]),
775 #self.failUnlessEqual(s0.remote_get_length(), 100)
777 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
778 f = self.failUnlessRaises(BadWriteEnablerError,
779 write, "si1", bad_secrets,
781 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
783 # this testv should fail
784 answer = write("si1", secrets,
785 {0: ([(0, 12, "eq", "444444444444"),
786 (20, 5, "eq", "22222"),
793 self.failUnlessEqual(answer, (False,
794 {0: ["000000000011", "22222"],
798 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
801 answer = write("si1", secrets,
802 {0: ([(10, 5, "lt", "11111"),
809 self.failUnlessEqual(answer, (False,
814 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
817 def test_operators(self):
818 # test operators, the data we're comparing is '11111' in all cases.
819 # test both fail+pass, reset data after each one.
820 ss = self.create("test_operators")
822 secrets = ( self.write_enabler("we1"),
823 self.renew_secret("we1"),
824 self.cancel_secret("we1") )
825 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
826 write = ss.remote_slot_testv_and_readv_and_writev
827 read = ss.remote_slot_readv
830 write("si1", secrets,
831 {0: ([], [(0,data)], None)},
837 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
842 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
843 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
844 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
847 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
852 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
853 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
856 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
861 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
862 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
866 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
871 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
872 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
875 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
880 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
881 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
884 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
889 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
890 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
894 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
899 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
900 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
903 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
908 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
909 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
913 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
918 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
919 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
922 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
927 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
928 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
932 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
937 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
938 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
941 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
946 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
947 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
950 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
955 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
956 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
960 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
965 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
966 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
969 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
974 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
975 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
978 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
983 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
984 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
987 # finally, test some operators against empty shares
988 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
993 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
994 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
997 def test_readv(self):
998 ss = self.create("test_readv")
999 secrets = ( self.write_enabler("we1"),
1000 self.renew_secret("we1"),
1001 self.cancel_secret("we1") )
1002 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1003 write = ss.remote_slot_testv_and_readv_and_writev
1004 read = ss.remote_slot_readv
1005 data = [("%d" % i) * 100 for i in range(3)]
1006 rc = write("si1", secrets,
1007 {0: ([], [(0,data[0])], None),
1008 1: ([], [(0,data[1])], None),
1009 2: ([], [(0,data[2])], None),
1011 self.failUnlessEqual(rc, (True, {}))
1013 answer = read("si1", [], [(0, 10)])
1014 self.failUnlessEqual(answer, {0: ["0"*10],
1018 def compare_leases_without_timestamps(self, leases_a, leases_b):
1019 self.failUnlessEqual(len(leases_a), len(leases_b))
1020 for i in range(len(leases_a)):
1021 num_a, a = leases_a[i]
1022 num_b, b = leases_b[i]
1023 self.failUnlessEqual(num_a, num_b)
1024 self.failUnlessEqual(a.owner_num, b.owner_num)
1025 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1026 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1027 self.failUnlessEqual(a.nodeid, b.nodeid)
1029 def compare_leases(self, leases_a, leases_b):
1030 self.failUnlessEqual(len(leases_a), len(leases_b))
1031 for i in range(len(leases_a)):
1032 num_a, a = leases_a[i]
1033 num_b, b = leases_b[i]
1034 self.failUnlessEqual(num_a, num_b)
1035 self.failUnlessEqual(a.owner_num, b.owner_num)
1036 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1037 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1038 self.failUnlessEqual(a.nodeid, b.nodeid)
1039 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1041 def test_leases(self):
1042 ss = self.create("test_leases")
1044 return ( self.write_enabler("we1"),
1045 self.renew_secret("we1-%d" % n),
1046 self.cancel_secret("we1-%d" % n) )
1047 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1048 write = ss.remote_slot_testv_and_readv_and_writev
1049 read = ss.remote_slot_readv
1050 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1051 self.failUnlessEqual(rc, (True, {}))
1053 # create a random non-numeric file in the bucket directory, to
1054 # exercise the code that's supposed to ignore those.
1055 bucket_dir = os.path.join(self.workdir("test_leases"),
1056 "shares", storage_index_to_dir("si1"))
1057 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1058 f.write("you ought to be ignoring me\n")
1061 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1062 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1064 # add-lease on a missing storage index is silently ignored
1065 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1067 # re-allocate the slots and use the same secrets, that should update
1069 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1070 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1073 ss.remote_renew_lease("si1", secrets(0)[1])
1074 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1076 # now allocate them with a bunch of different secrets, to trigger the
1077 # extended lease code. Use add_lease for one of them.
1078 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1079 self.failUnlessEqual(len(s0.debug_get_leases()), 2)
1080 secrets2 = secrets(2)
1081 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1082 self.failUnlessEqual(len(s0.debug_get_leases()), 3)
1083 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1084 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1085 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1087 self.failUnlessEqual(len(s0.debug_get_leases()), 6)
1089 # cancel one of them
1090 ss.remote_cancel_lease("si1", secrets(5)[2])
1091 self.failUnlessEqual(len(s0.debug_get_leases()), 5)
1093 all_leases = s0.debug_get_leases()
1094 # and write enough data to expand the container, forcing the server
1095 # to move the leases
1096 write("si1", secrets(0),
1097 {0: ([], [(0,data)], 200), },
1100 # read back the leases, make sure they're still intact.
1101 self.compare_leases_without_timestamps(all_leases,
1102 s0.debug_get_leases())
1104 ss.remote_renew_lease("si1", secrets(0)[1])
1105 ss.remote_renew_lease("si1", secrets(1)[1])
1106 ss.remote_renew_lease("si1", secrets(2)[1])
1107 ss.remote_renew_lease("si1", secrets(3)[1])
1108 ss.remote_renew_lease("si1", secrets(4)[1])
1109 self.compare_leases_without_timestamps(all_leases,
1110 s0.debug_get_leases())
1111 # get a new copy of the leases, with the current timestamps. Reading
1112 # data and failing to renew/cancel leases should leave the timestamps
1114 all_leases = s0.debug_get_leases()
1115 # renewing with a bogus token should prompt an error message
1117 # examine the exception thus raised, make sure the old nodeid is
1118 # present, to provide for share migration
1119 e = self.failUnlessRaises(IndexError,
1120 ss.remote_renew_lease, "si1",
1123 self.failUnless("Unable to renew non-existent lease" in e_s)
1124 self.failUnless("I have leases accepted by nodeids:" in e_s)
1125 self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1127 # same for cancelling
1128 self.failUnlessRaises(IndexError,
1129 ss.remote_cancel_lease, "si1",
1131 self.compare_leases(all_leases, s0.debug_get_leases())
1133 # reading shares should not modify the timestamp
1134 read("si1", [], [(0,200)])
1135 self.compare_leases(all_leases, s0.debug_get_leases())
1137 write("si1", secrets(0),
1138 {0: ([], [(200, "make me bigger")], None)}, [])
1139 self.compare_leases_without_timestamps(all_leases,
1140 s0.debug_get_leases())
1142 write("si1", secrets(0),
1143 {0: ([], [(500, "make me really bigger")], None)}, [])
1144 self.compare_leases_without_timestamps(all_leases,
1145 s0.debug_get_leases())
1147 # now cancel them all
1148 ss.remote_cancel_lease("si1", secrets(0)[2])
1149 ss.remote_cancel_lease("si1", secrets(1)[2])
1150 ss.remote_cancel_lease("si1", secrets(2)[2])
1151 ss.remote_cancel_lease("si1", secrets(3)[2])
1153 # the slot should still be there
1154 remaining_shares = read("si1", [], [(0,10)])
1155 self.failUnlessEqual(len(remaining_shares), 1)
1156 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1158 # cancelling a non-existent lease should raise an IndexError
1159 self.failUnlessRaises(IndexError,
1160 ss.remote_cancel_lease, "si1", "nonsecret")
1162 # and the slot should still be there
1163 remaining_shares = read("si1", [], [(0,10)])
1164 self.failUnlessEqual(len(remaining_shares), 1)
1165 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1167 ss.remote_cancel_lease("si1", secrets(4)[2])
1168 # now the slot should be gone
1169 no_shares = read("si1", [], [(0,10)])
1170 self.failUnlessEqual(no_shares, {})
1172 # cancelling a lease on a non-existent share should raise an IndexError
1173 self.failUnlessRaises(IndexError,
1174 ss.remote_cancel_lease, "si2", "nonsecret")
1176 def test_remove(self):
1177 ss = self.create("test_remove")
1178 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1180 readv = ss.remote_slot_readv
1181 writev = ss.remote_slot_testv_and_readv_and_writev
1182 secrets = ( self.write_enabler("we1"),
1183 self.renew_secret("we1"),
1184 self.cancel_secret("we1") )
1185 # delete sh0 by setting its size to zero
1186 answer = writev("si1", secrets,
1189 # the answer should mention all the shares that existed before the
1191 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1192 # but a new read should show only sh1 and sh2
1193 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1196 # delete sh1 by setting its size to zero
1197 answer = writev("si1", secrets,
1200 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1201 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1204 # delete sh2 by setting its size to zero
1205 answer = writev("si1", secrets,
1208 self.failUnlessEqual(answer, (True, {2:[]}) )
1209 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1211 # and the bucket directory should now be gone
1212 si = base32.b2a("si1")
1213 # note: this is a detail of the storage server implementation, and
1214 # may change in the future
1216 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1217 bucketdir = os.path.join(prefixdir, si)
1218 self.failUnless(os.path.exists(prefixdir))
1219 self.failIf(os.path.exists(bucketdir))
1221 class Stats(unittest.TestCase):
1224 self.sparent = LoggingServiceParent()
1225 self._lease_secret = itertools.count()
1227 return self.sparent.stopService()
1229 def workdir(self, name):
1230 basedir = os.path.join("storage", "Server", name)
1233 def create(self, name):
1234 workdir = self.workdir(name)
1235 ss = StorageServer(workdir, "\x00" * 20)
1236 ss.setServiceParent(self.sparent)
1239 def test_latencies(self):
1240 ss = self.create("test_latencies")
1241 for i in range(10000):
1242 ss.add_latency("allocate", 1.0 * i)
1243 for i in range(1000):
1244 ss.add_latency("renew", 1.0 * i)
1246 ss.add_latency("cancel", 2.0 * i)
1247 ss.add_latency("get", 5.0)
1249 output = ss.get_latencies()
1251 self.failUnlessEqual(sorted(output.keys()),
1252 sorted(["allocate", "renew", "cancel", "get"]))
1253 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1254 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1255 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1256 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1257 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1258 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1259 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1260 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1261 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1263 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1264 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1265 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1)
1266 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1267 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1268 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1269 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1270 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1271 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1273 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1274 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1275 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1)
1276 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1)
1277 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1278 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1279 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1280 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1281 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1283 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1284 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1285 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1286 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1287 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1288 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1289 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1290 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1291 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1293 class NoStatvfsServer(StorageServer):
1294 def do_statvfs(self):
1295 raise AttributeError
1297 class WebStatus(unittest.TestCase):
1299 def test_no_server(self):
1300 w = StorageStatus(None)
1301 html = w.renderSynchronously()
1302 self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
1305 def remove_tags(self, s):
1306 s = re.sub(r'<[^>]*>', ' ', s)
1307 s = re.sub(r'\s+', ' ', s)
1310 def test_status(self):
1311 basedir = "storage/WebStatus/status"
1312 fileutil.make_dirs(basedir)
1313 ss = StorageServer(basedir, "\x00" * 20)
1314 w = StorageStatus(ss)
1315 html = w.renderSynchronously()
1316 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1317 s = self.remove_tags(html)
1318 self.failUnless("Accepting new shares: Yes" in s, s)
1319 self.failUnless("Reserved space: - 0B" in s, s)
1321 def test_status_no_statvfs(self):
1322 # windows has no os.statvfs . Make sure the code handles that even on
1324 basedir = "storage/WebStatus/status_no_statvfs"
1325 fileutil.make_dirs(basedir)
1326 ss = NoStatvfsServer(basedir, "\x00" * 20)
1327 w = StorageStatus(ss)
1328 html = w.renderSynchronously()
1329 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1330 s = self.remove_tags(html)
1331 self.failUnless("Accepting new shares: Yes" in s, s)
1332 self.failUnless("Total disk space: ?" in s, s)
1334 def test_readonly(self):
1335 basedir = "storage/WebStatus/readonly"
1336 fileutil.make_dirs(basedir)
1337 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
1338 w = StorageStatus(ss)
1339 html = w.renderSynchronously()
1340 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1341 s = self.remove_tags(html)
1342 self.failUnless("Accepting new shares: No" in s, s)
1344 def test_reserved(self):
1345 basedir = "storage/WebStatus/reserved"
1346 fileutil.make_dirs(basedir)
1347 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
1348 w = StorageStatus(ss)
1349 html = w.renderSynchronously()
1350 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1351 s = self.remove_tags(html)
1352 self.failUnless("Reserved space: - 10.00MB" in s, s)
1354 def test_util(self):
1355 self.failUnlessEqual(abbreviate_if_known(None), "?")
1356 self.failUnlessEqual(abbreviate_if_known(10e6), "10.00MB")
1357 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
1358 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)