2 import time, os.path, stat, re
4 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap import eventual
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin
12 from allmydata.storage.server import StorageServer, storage_index_to_dir
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError
16 from allmydata.storage.lease import LeaseInfo
17 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
19 from allmydata.interfaces import BadWriteEnablerError
20 from allmydata.test.common import LoggingServiceParent
21 from allmydata.web.storage import StorageStatus, remove_prefix
26 def __init__(self, ignore_disconnectors=False):
27 self.ignore = ignore_disconnectors
28 self.disconnectors = {}
29 def notifyOnDisconnect(self, f, *args, **kwargs):
33 self.disconnectors[m] = (f, args, kwargs)
35 def dontNotifyOnDisconnect(self, marker):
38 del self.disconnectors[marker]
40 class FakeStatsProvider:
41 def count(self, name, delta=1):
43 def register_producer(self, producer):
46 class Bucket(unittest.TestCase):
47 def make_workdir(self, name):
48 basedir = os.path.join("storage", "Bucket", name)
49 incoming = os.path.join(basedir, "tmp", "bucket")
50 final = os.path.join(basedir, "bucket")
51 fileutil.make_dirs(basedir)
52 fileutil.make_dirs(os.path.join(basedir, "tmp"))
53 return incoming, final
55 def bucket_writer_closed(self, bw, consumed):
57 def add_latency(self, category, latency):
59 def count(self, name, delta=1):
64 renew_secret = os.urandom(32)
65 cancel_secret = os.urandom(32)
66 expiration_time = time.time() + 5000
67 return LeaseInfo(owner_num, renew_secret, cancel_secret,
68 expiration_time, "\x00" * 20)
70 def test_create(self):
71 incoming, final = self.make_workdir("test_create")
72 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
74 bw.remote_write(0, "a"*25)
75 bw.remote_write(25, "b"*25)
76 bw.remote_write(50, "c"*25)
77 bw.remote_write(75, "d"*7)
80 def test_readwrite(self):
81 incoming, final = self.make_workdir("test_readwrite")
82 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
84 bw.remote_write(0, "a"*25)
85 bw.remote_write(25, "b"*25)
86 bw.remote_write(50, "c"*7) # last block may be short
90 br = BucketReader(self, bw.finalhome)
91 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
92 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
93 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
97 def callRemote(self, methname, *args, **kwargs):
99 meth = getattr(self.target, "remote_" + methname)
100 return meth(*args, **kwargs)
101 return defer.maybeDeferred(_call)
103 class BucketProxy(unittest.TestCase):
104 def make_bucket(self, name, size):
105 basedir = os.path.join("storage", "BucketProxy", name)
106 incoming = os.path.join(basedir, "tmp", "bucket")
107 final = os.path.join(basedir, "bucket")
108 fileutil.make_dirs(basedir)
109 fileutil.make_dirs(os.path.join(basedir, "tmp"))
110 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
116 def make_lease(self):
118 renew_secret = os.urandom(32)
119 cancel_secret = os.urandom(32)
120 expiration_time = time.time() + 5000
121 return LeaseInfo(owner_num, renew_secret, cancel_secret,
122 expiration_time, "\x00" * 20)
124 def bucket_writer_closed(self, bw, consumed):
126 def add_latency(self, category, latency):
128 def count(self, name, delta=1):
131 def test_create(self):
132 bw, rb, sharefname = self.make_bucket("test_create", 500)
133 bp = WriteBucketProxy(rb,
138 uri_extension_size_max=500, nodeid=None)
139 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
141 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
142 # Let's pretend each share has 100 bytes of data, and that there are
143 # 4 segments (25 bytes each), and 8 shares total. So the two
144 # per-segment merkle trees (crypttext_hash_tree,
145 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
146 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
147 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
148 # long. That should make the whole share:
150 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
151 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
153 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
155 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
157 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
159 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
161 uri_extension = "s" + "E"*498 + "e"
163 bw, rb, sharefname = self.make_bucket(name, sharesize)
169 uri_extension_size_max=len(uri_extension),
173 d.addCallback(lambda res: bp.put_block(0, "a"*25))
174 d.addCallback(lambda res: bp.put_block(1, "b"*25))
175 d.addCallback(lambda res: bp.put_block(2, "c"*25))
176 d.addCallback(lambda res: bp.put_block(3, "d"*20))
177 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
178 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
179 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
180 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
181 d.addCallback(lambda res: bp.close())
183 # now read everything back
184 def _start_reading(res):
185 br = BucketReader(self, sharefname)
188 rbp = rbp_class(rb, peerid="abc", storage_index="")
189 self.failUnless("to peer" in repr(rbp))
190 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
192 d1 = rbp.get_block_data(0, 25, 25)
193 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
194 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
195 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
196 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
197 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
198 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
199 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
201 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
202 d1.addCallback(lambda res:
203 self.failUnlessEqual(res, crypttext_hashes))
204 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
205 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
206 d1.addCallback(lambda res: rbp.get_share_hashes())
207 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
208 d1.addCallback(lambda res: rbp.get_uri_extension())
209 d1.addCallback(lambda res:
210 self.failUnlessEqual(res, uri_extension))
214 d.addCallback(_start_reading)
218 def test_readwrite_v1(self):
219 return self._do_test_readwrite("test_readwrite_v1",
220 0x24, WriteBucketProxy, ReadBucketProxy)
222 def test_readwrite_v2(self):
223 return self._do_test_readwrite("test_readwrite_v2",
224 0x44, WriteBucketProxy_v2, ReadBucketProxy)
226 class FakeDiskStorageServer(StorageServer):
227 def stat_disk(self, d):
228 return self.DISKAVAIL
230 class Server(unittest.TestCase):
233 self.sparent = LoggingServiceParent()
234 self.sparent.startService()
235 self._lease_secret = itertools.count()
237 return self.sparent.stopService()
239 def workdir(self, name):
240 basedir = os.path.join("storage", "Server", name)
243 def create(self, name, reserved_space=0, klass=StorageServer):
244 workdir = self.workdir(name)
245 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
246 stats_provider=FakeStatsProvider())
247 ss.setServiceParent(self.sparent)
250 def test_create(self):
251 ss = self.create("test_create")
253 def allocate(self, ss, storage_index, sharenums, size, canary=None):
254 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
255 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
257 canary = FakeCanary()
258 return ss.remote_allocate_buckets(storage_index,
259 renew_secret, cancel_secret,
260 sharenums, size, canary)
262 def test_large_share(self):
263 ss = self.create("test_large_share")
265 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
266 self.failUnlessEqual(already, set())
267 self.failUnlessEqual(set(writers.keys()), set([0]))
269 shnum, bucket = writers.items()[0]
270 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
271 bucket.remote_write(2**32, "ab")
272 bucket.remote_close()
274 readers = ss.remote_get_buckets("allocate")
275 reader = readers[shnum]
276 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
277 test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
279 def test_dont_overfill_dirs(self):
281 This test asserts that if you add a second share whose storage index
282 share lots of leading bits with an extant share (but isn't the exact
283 same storage index), this won't add an entry to the share directory.
285 ss = self.create("test_dont_overfill_dirs")
286 already, writers = self.allocate(ss, "storageindex", [0], 10)
287 for i, wb in writers.items():
288 wb.remote_write(0, "%10d" % i)
290 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
292 children_of_storedir = set(os.listdir(storedir))
294 # Now store another one under another storageindex that has leading
295 # chars the same as the first storageindex.
296 already, writers = self.allocate(ss, "storageindey", [0], 10)
297 for i, wb in writers.items():
298 wb.remote_write(0, "%10d" % i)
300 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
302 new_children_of_storedir = set(os.listdir(storedir))
303 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
305 def test_remove_incoming(self):
306 ss = self.create("test_remove_incoming")
307 already, writers = self.allocate(ss, "vid", range(3), 10)
308 for i,wb in writers.items():
309 wb.remote_write(0, "%10d" % i)
311 incoming_share_dir = wb.incominghome
312 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
313 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
314 incoming_dir = os.path.dirname(incoming_prefix_dir)
315 self.failIf(os.path.exists(incoming_bucket_dir))
316 self.failIf(os.path.exists(incoming_prefix_dir))
317 self.failUnless(os.path.exists(incoming_dir))
319 def test_allocate(self):
320 ss = self.create("test_allocate")
322 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
324 canary = FakeCanary()
325 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
326 self.failUnlessEqual(already, set())
327 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
329 # while the buckets are open, they should not count as readable
330 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
333 for i,wb in writers.items():
334 wb.remote_write(0, "%25d" % i)
336 # aborting a bucket that was already closed is a no-op
339 # now they should be readable
340 b = ss.remote_get_buckets("allocate")
341 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
342 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
344 # now if we ask about writing again, the server should offer those
345 # three buckets as already present. It should offer them even if we
346 # don't ask about those specific ones.
347 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
348 self.failUnlessEqual(already, set([0,1,2]))
349 self.failUnlessEqual(set(writers.keys()), set([3,4]))
351 # while those two buckets are open for writing, the server should
352 # refuse to offer them to uploaders
354 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
355 self.failUnlessEqual(already2, set([0,1,2]))
356 self.failUnlessEqual(set(writers2.keys()), set([5]))
358 # aborting the writes should remove the tempfiles
359 for i,wb in writers2.items():
361 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
362 self.failUnlessEqual(already2, set([0,1,2]))
363 self.failUnlessEqual(set(writers2.keys()), set([5]))
365 for i,wb in writers2.items():
367 for i,wb in writers.items():
370 def test_disconnect(self):
371 # simulate a disconnection
372 ss = self.create("test_disconnect")
373 canary = FakeCanary()
374 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
375 self.failUnlessEqual(already, set())
376 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
377 for (f,args,kwargs) in canary.disconnectors.values():
382 # that ought to delete the incoming shares
383 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
384 self.failUnlessEqual(already, set())
385 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
387 def test_reserved_space(self):
388 ss = self.create("test_reserved_space", reserved_space=10000,
389 klass=FakeDiskStorageServer)
390 # the FakeDiskStorageServer doesn't do real statvfs() calls
392 # 15k available, 10k reserved, leaves 5k for shares
394 # a newly created and filled share incurs this much overhead, beyond
395 # the size we request.
397 LEASE_SIZE = 4+32+32+4
398 canary = FakeCanary(True)
399 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
400 self.failUnlessEqual(len(writers), 3)
401 # now the StorageServer should have 3000 bytes provisionally
402 # allocated, allowing only 2000 more to be claimed
403 self.failUnlessEqual(len(ss._active_writers), 3)
405 # allocating 1001-byte shares only leaves room for one
406 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
407 self.failUnlessEqual(len(writers2), 1)
408 self.failUnlessEqual(len(ss._active_writers), 4)
410 # we abandon the first set, so their provisional allocation should be
414 self.failUnlessEqual(len(ss._active_writers), 1)
415 # now we have a provisional allocation of 1001 bytes
417 # and we close the second set, so their provisional allocation should
418 # become real, long-term allocation, and grows to include the
420 for bw in writers2.values():
421 bw.remote_write(0, "a"*25)
426 self.failUnlessEqual(len(ss._active_writers), 0)
428 allocated = 1001 + OVERHEAD + LEASE_SIZE
430 # we have to manually increase DISKAVAIL, since we're not doing real
432 ss.DISKAVAIL -= allocated
434 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
435 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
436 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
437 self.failUnlessEqual(len(writers3), 39)
438 self.failUnlessEqual(len(ss._active_writers), 39)
442 self.failUnlessEqual(len(ss._active_writers), 0)
443 ss.disownServiceParent()
447 basedir = self.workdir("test_seek_behavior")
448 fileutil.make_dirs(basedir)
449 filename = os.path.join(basedir, "testfile")
450 f = open(filename, "wb")
453 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
454 # files. mode="a" preserves previous contents but does not allow
455 # seeking-to-create-holes. mode="r+" allows both.
456 f = open(filename, "rb+")
460 filelen = os.stat(filename)[stat.ST_SIZE]
461 self.failUnlessEqual(filelen, 100+3)
462 f2 = open(filename, "rb")
463 self.failUnlessEqual(f2.read(5), "start")
466 def test_leases(self):
467 ss = self.create("test_leases")
468 canary = FakeCanary()
472 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
473 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
474 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
475 sharenums, size, canary)
476 self.failUnlessEqual(len(already), 0)
477 self.failUnlessEqual(len(writers), 5)
478 for wb in writers.values():
481 leases = list(ss.get_leases("si0"))
482 self.failUnlessEqual(len(leases), 1)
483 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
485 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
486 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
487 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
488 sharenums, size, canary)
489 for wb in writers.values():
492 # take out a second lease on si1
493 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
494 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
495 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
496 sharenums, size, canary)
497 self.failUnlessEqual(len(already), 5)
498 self.failUnlessEqual(len(writers), 0)
500 leases = list(ss.get_leases("si1"))
501 self.failUnlessEqual(len(leases), 2)
502 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
504 # and a third lease, using add-lease
505 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
506 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
507 ss.remote_add_lease("si1", rs2a, cs2a)
508 leases = list(ss.get_leases("si1"))
509 self.failUnlessEqual(len(leases), 3)
510 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
512 # add-lease on a missing storage index is silently ignored
513 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
515 # check that si0 is readable
516 readers = ss.remote_get_buckets("si0")
517 self.failUnlessEqual(len(readers), 5)
519 # renew the first lease. Only the proper renew_secret should work
520 ss.remote_renew_lease("si0", rs0)
521 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
522 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
524 # check that si0 is still readable
525 readers = ss.remote_get_buckets("si0")
526 self.failUnlessEqual(len(readers), 5)
529 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
530 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
531 ss.remote_cancel_lease("si0", cs0)
533 # si0 should now be gone
534 readers = ss.remote_get_buckets("si0")
535 self.failUnlessEqual(len(readers), 0)
536 # and the renew should no longer work
537 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
540 # cancel the first lease on si1, leaving the second and third in place
541 ss.remote_cancel_lease("si1", cs1)
542 readers = ss.remote_get_buckets("si1")
543 self.failUnlessEqual(len(readers), 5)
544 # the corresponding renew should no longer work
545 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
547 leases = list(ss.get_leases("si1"))
548 self.failUnlessEqual(len(leases), 2)
549 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
551 ss.remote_renew_lease("si1", rs2)
552 # cancelling the second and third should make it go away
553 ss.remote_cancel_lease("si1", cs2)
554 ss.remote_cancel_lease("si1", cs2a)
555 readers = ss.remote_get_buckets("si1")
556 self.failUnlessEqual(len(readers), 0)
557 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
558 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
559 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
561 leases = list(ss.get_leases("si1"))
562 self.failUnlessEqual(len(leases), 0)
565 # test overlapping uploads
566 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
567 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
568 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
569 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
570 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
571 sharenums, size, canary)
572 self.failUnlessEqual(len(already), 0)
573 self.failUnlessEqual(len(writers), 5)
574 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
575 sharenums, size, canary)
576 self.failUnlessEqual(len(already2), 0)
577 self.failUnlessEqual(len(writers2), 0)
578 for wb in writers.values():
581 leases = list(ss.get_leases("si3"))
582 self.failUnlessEqual(len(leases), 1)
584 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
585 sharenums, size, canary)
586 self.failUnlessEqual(len(already3), 5)
587 self.failUnlessEqual(len(writers3), 0)
589 leases = list(ss.get_leases("si3"))
590 self.failUnlessEqual(len(leases), 2)
592 def test_readonly(self):
593 workdir = self.workdir("test_readonly")
594 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
595 ss.setServiceParent(self.sparent)
597 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
598 self.failUnlessEqual(already, set())
599 self.failUnlessEqual(writers, {})
601 stats = ss.get_stats()
602 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
604 if "storage_server.disk_avail" in stats:
605 # windows does not have os.statvfs, so it doesn't give us disk
606 # stats. But if there are stats, readonly_storage means
608 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
610 def test_discard(self):
611 # discard is really only used for other tests, but we test it anyways
612 workdir = self.workdir("test_discard")
613 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
614 ss.setServiceParent(self.sparent)
616 canary = FakeCanary()
617 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
618 self.failUnlessEqual(already, set())
619 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
620 for i,wb in writers.items():
621 wb.remote_write(0, "%25d" % i)
623 # since we discard the data, the shares should be present but sparse.
624 # Since we write with some seeks, the data we read back will be all
626 b = ss.remote_get_buckets("vid")
627 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
628 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
630 def test_advise_corruption(self):
631 workdir = self.workdir("test_advise_corruption")
632 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
633 ss.setServiceParent(self.sparent)
635 si0_s = base32.b2a("si0")
636 ss.remote_advise_corrupt_share("immutable", "si0", 0,
637 "This share smells funny.\n")
638 reportdir = os.path.join(workdir, "corruption-advisories")
639 reports = os.listdir(reportdir)
640 self.failUnlessEqual(len(reports), 1)
641 report_si0 = reports[0]
642 self.failUnless(si0_s in report_si0, report_si0)
643 f = open(os.path.join(reportdir, report_si0), "r")
646 self.failUnless("type: immutable" in report)
647 self.failUnless(("storage_index: %s" % si0_s) in report)
648 self.failUnless("share_number: 0" in report)
649 self.failUnless("This share smells funny." in report)
651 # test the RIBucketWriter version too
652 si1_s = base32.b2a("si1")
653 already,writers = self.allocate(ss, "si1", [1], 75)
654 self.failUnlessEqual(already, set())
655 self.failUnlessEqual(set(writers.keys()), set([1]))
656 writers[1].remote_write(0, "data")
657 writers[1].remote_close()
659 b = ss.remote_get_buckets("si1")
660 self.failUnlessEqual(set(b.keys()), set([1]))
661 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
663 reports = os.listdir(reportdir)
664 self.failUnlessEqual(len(reports), 2)
665 report_si1 = [r for r in reports if si1_s in r][0]
666 f = open(os.path.join(reportdir, report_si1), "r")
669 self.failUnless("type: immutable" in report)
670 self.failUnless(("storage_index: %s" % si1_s) in report)
671 self.failUnless("share_number: 1" in report)
672 self.failUnless("This share tastes like dust." in report)
676 class MutableServer(unittest.TestCase):
679 self.sparent = LoggingServiceParent()
680 self._lease_secret = itertools.count()
682 return self.sparent.stopService()
684 def workdir(self, name):
685 basedir = os.path.join("storage", "MutableServer", name)
688 def create(self, name):
689 workdir = self.workdir(name)
690 ss = StorageServer(workdir, "\x00" * 20)
691 ss.setServiceParent(self.sparent)
694 def test_create(self):
695 ss = self.create("test_create")
697 def write_enabler(self, we_tag):
698 return hashutil.tagged_hash("we_blah", we_tag)
700 def renew_secret(self, tag):
701 return hashutil.tagged_hash("renew_blah", str(tag))
703 def cancel_secret(self, tag):
704 return hashutil.tagged_hash("cancel_blah", str(tag))
706 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
707 write_enabler = self.write_enabler(we_tag)
708 renew_secret = self.renew_secret(lease_tag)
709 cancel_secret = self.cancel_secret(lease_tag)
710 rstaraw = ss.remote_slot_testv_and_readv_and_writev
711 testandwritev = dict( [ (shnum, ([], [], None) )
712 for shnum in sharenums ] )
714 rc = rstaraw(storage_index,
715 (write_enabler, renew_secret, cancel_secret),
718 (did_write, readv_data) = rc
719 self.failUnless(did_write)
720 self.failUnless(isinstance(readv_data, dict))
721 self.failUnlessEqual(len(readv_data), 0)
723 def test_container_size(self):
724 ss = self.create("test_container_size")
725 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
727 rstaraw = ss.remote_slot_testv_and_readv_and_writev
728 secrets = ( self.write_enabler("we1"),
729 self.renew_secret("we1"),
730 self.cancel_secret("we1") )
731 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
732 answer = rstaraw("si1", secrets,
733 {0: ([], [(0,data)], len(data)+12)},
735 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
737 # trying to make the container too large will raise an exception
738 TOOBIG = MutableShareFile.MAX_SIZE + 10
739 self.failUnlessRaises(DataTooLargeError,
740 rstaraw, "si1", secrets,
741 {0: ([], [(0,data)], TOOBIG)},
744 # it should be possible to make the container smaller, although at
745 # the moment this doesn't actually affect the share
746 answer = rstaraw("si1", secrets,
747 {0: ([], [(0,data)], len(data)+8)},
749 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
751 def test_allocate(self):
752 ss = self.create("test_allocate")
753 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
756 read = ss.remote_slot_readv
757 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
759 self.failUnlessEqual(read("si1", [], [(0, 10)]),
760 {0: [""], 1: [""], 2: [""]})
761 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
765 secrets = ( self.write_enabler("we1"),
766 self.renew_secret("we1"),
767 self.cancel_secret("we1") )
768 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
769 write = ss.remote_slot_testv_and_readv_and_writev
770 answer = write("si1", secrets,
771 {0: ([], [(0,data)], None)},
773 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
775 self.failUnlessEqual(read("si1", [0], [(0,20)]),
776 {0: ["00000000001111111111"]})
777 self.failUnlessEqual(read("si1", [0], [(95,10)]),
779 #self.failUnlessEqual(s0.remote_get_length(), 100)
781 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
782 f = self.failUnlessRaises(BadWriteEnablerError,
783 write, "si1", bad_secrets,
785 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
787 # this testv should fail
788 answer = write("si1", secrets,
789 {0: ([(0, 12, "eq", "444444444444"),
790 (20, 5, "eq", "22222"),
797 self.failUnlessEqual(answer, (False,
798 {0: ["000000000011", "22222"],
802 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
805 answer = write("si1", secrets,
806 {0: ([(10, 5, "lt", "11111"),
813 self.failUnlessEqual(answer, (False,
818 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
821 def test_operators(self):
822 # test operators, the data we're comparing is '11111' in all cases.
823 # test both fail+pass, reset data after each one.
824 ss = self.create("test_operators")
826 secrets = ( self.write_enabler("we1"),
827 self.renew_secret("we1"),
828 self.cancel_secret("we1") )
829 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
830 write = ss.remote_slot_testv_and_readv_and_writev
831 read = ss.remote_slot_readv
834 write("si1", secrets,
835 {0: ([], [(0,data)], None)},
841 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
846 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
847 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
848 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
851 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
856 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
857 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
860 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
865 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
866 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
870 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
875 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
876 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
879 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
884 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
885 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
888 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
893 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
894 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
898 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
903 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
904 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
907 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
912 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
913 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
917 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
922 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
923 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
926 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
931 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
932 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
936 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
941 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
942 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
945 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
950 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
951 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
954 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
959 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
960 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
964 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
969 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
970 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
973 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
978 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
979 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
982 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
987 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
988 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
991 # finally, test some operators against empty shares
992 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
997 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
998 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1001 def test_readv(self):
1002 ss = self.create("test_readv")
1003 secrets = ( self.write_enabler("we1"),
1004 self.renew_secret("we1"),
1005 self.cancel_secret("we1") )
1006 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1007 write = ss.remote_slot_testv_and_readv_and_writev
1008 read = ss.remote_slot_readv
1009 data = [("%d" % i) * 100 for i in range(3)]
1010 rc = write("si1", secrets,
1011 {0: ([], [(0,data[0])], None),
1012 1: ([], [(0,data[1])], None),
1013 2: ([], [(0,data[2])], None),
1015 self.failUnlessEqual(rc, (True, {}))
1017 answer = read("si1", [], [(0, 10)])
1018 self.failUnlessEqual(answer, {0: ["0"*10],
1022 def compare_leases_without_timestamps(self, leases_a, leases_b):
1023 self.failUnlessEqual(len(leases_a), len(leases_b))
1024 for i in range(len(leases_a)):
1025 num_a, a = leases_a[i]
1026 num_b, b = leases_b[i]
1027 self.failUnlessEqual(num_a, num_b)
1028 self.failUnlessEqual(a.owner_num, b.owner_num)
1029 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1030 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1031 self.failUnlessEqual(a.nodeid, b.nodeid)
1033 def compare_leases(self, leases_a, leases_b):
1034 self.failUnlessEqual(len(leases_a), len(leases_b))
1035 for i in range(len(leases_a)):
1036 num_a, a = leases_a[i]
1037 num_b, b = leases_b[i]
1038 self.failUnlessEqual(num_a, num_b)
1039 self.failUnlessEqual(a.owner_num, b.owner_num)
1040 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1041 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1042 self.failUnlessEqual(a.nodeid, b.nodeid)
1043 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1045 def test_leases(self):
1046 ss = self.create("test_leases")
1048 return ( self.write_enabler("we1"),
1049 self.renew_secret("we1-%d" % n),
1050 self.cancel_secret("we1-%d" % n) )
1051 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1052 write = ss.remote_slot_testv_and_readv_and_writev
1053 read = ss.remote_slot_readv
1054 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1055 self.failUnlessEqual(rc, (True, {}))
1057 # create a random non-numeric file in the bucket directory, to
1058 # exercise the code that's supposed to ignore those.
1059 bucket_dir = os.path.join(self.workdir("test_leases"),
1060 "shares", storage_index_to_dir("si1"))
1061 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1062 f.write("you ought to be ignoring me\n")
1065 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1066 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1068 # add-lease on a missing storage index is silently ignored
1069 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1071 # re-allocate the slots and use the same secrets, that should update
1073 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1074 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1077 ss.remote_renew_lease("si1", secrets(0)[1])
1078 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1080 # now allocate them with a bunch of different secrets, to trigger the
1081 # extended lease code. Use add_lease for one of them.
1082 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1083 self.failUnlessEqual(len(s0.debug_get_leases()), 2)
1084 secrets2 = secrets(2)
1085 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1086 self.failUnlessEqual(len(s0.debug_get_leases()), 3)
1087 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1088 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1089 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1091 self.failUnlessEqual(len(s0.debug_get_leases()), 6)
1093 # cancel one of them
1094 ss.remote_cancel_lease("si1", secrets(5)[2])
1095 self.failUnlessEqual(len(s0.debug_get_leases()), 5)
1097 all_leases = s0.debug_get_leases()
1098 # and write enough data to expand the container, forcing the server
1099 # to move the leases
1100 write("si1", secrets(0),
1101 {0: ([], [(0,data)], 200), },
1104 # read back the leases, make sure they're still intact.
1105 self.compare_leases_without_timestamps(all_leases,
1106 s0.debug_get_leases())
1108 ss.remote_renew_lease("si1", secrets(0)[1])
1109 ss.remote_renew_lease("si1", secrets(1)[1])
1110 ss.remote_renew_lease("si1", secrets(2)[1])
1111 ss.remote_renew_lease("si1", secrets(3)[1])
1112 ss.remote_renew_lease("si1", secrets(4)[1])
1113 self.compare_leases_without_timestamps(all_leases,
1114 s0.debug_get_leases())
1115 # get a new copy of the leases, with the current timestamps. Reading
1116 # data and failing to renew/cancel leases should leave the timestamps
1118 all_leases = s0.debug_get_leases()
1119 # renewing with a bogus token should prompt an error message
1121 # examine the exception thus raised, make sure the old nodeid is
1122 # present, to provide for share migration
1123 e = self.failUnlessRaises(IndexError,
1124 ss.remote_renew_lease, "si1",
1127 self.failUnless("Unable to renew non-existent lease" in e_s)
1128 self.failUnless("I have leases accepted by nodeids:" in e_s)
1129 self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1131 # same for cancelling
1132 self.failUnlessRaises(IndexError,
1133 ss.remote_cancel_lease, "si1",
1135 self.compare_leases(all_leases, s0.debug_get_leases())
1137 # reading shares should not modify the timestamp
1138 read("si1", [], [(0,200)])
1139 self.compare_leases(all_leases, s0.debug_get_leases())
1141 write("si1", secrets(0),
1142 {0: ([], [(200, "make me bigger")], None)}, [])
1143 self.compare_leases_without_timestamps(all_leases,
1144 s0.debug_get_leases())
1146 write("si1", secrets(0),
1147 {0: ([], [(500, "make me really bigger")], None)}, [])
1148 self.compare_leases_without_timestamps(all_leases,
1149 s0.debug_get_leases())
1151 # now cancel them all
1152 ss.remote_cancel_lease("si1", secrets(0)[2])
1153 ss.remote_cancel_lease("si1", secrets(1)[2])
1154 ss.remote_cancel_lease("si1", secrets(2)[2])
1155 ss.remote_cancel_lease("si1", secrets(3)[2])
1157 # the slot should still be there
1158 remaining_shares = read("si1", [], [(0,10)])
1159 self.failUnlessEqual(len(remaining_shares), 1)
1160 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1162 # cancelling a non-existent lease should raise an IndexError
1163 self.failUnlessRaises(IndexError,
1164 ss.remote_cancel_lease, "si1", "nonsecret")
1166 # and the slot should still be there
1167 remaining_shares = read("si1", [], [(0,10)])
1168 self.failUnlessEqual(len(remaining_shares), 1)
1169 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1171 ss.remote_cancel_lease("si1", secrets(4)[2])
1172 # now the slot should be gone
1173 no_shares = read("si1", [], [(0,10)])
1174 self.failUnlessEqual(no_shares, {})
1176 # cancelling a lease on a non-existent share should raise an IndexError
1177 self.failUnlessRaises(IndexError,
1178 ss.remote_cancel_lease, "si2", "nonsecret")
1180 def test_remove(self):
1181 ss = self.create("test_remove")
1182 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1184 readv = ss.remote_slot_readv
1185 writev = ss.remote_slot_testv_and_readv_and_writev
1186 secrets = ( self.write_enabler("we1"),
1187 self.renew_secret("we1"),
1188 self.cancel_secret("we1") )
1189 # delete sh0 by setting its size to zero
1190 answer = writev("si1", secrets,
1193 # the answer should mention all the shares that existed before the
1195 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1196 # but a new read should show only sh1 and sh2
1197 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1200 # delete sh1 by setting its size to zero
1201 answer = writev("si1", secrets,
1204 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1205 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1208 # delete sh2 by setting its size to zero
1209 answer = writev("si1", secrets,
1212 self.failUnlessEqual(answer, (True, {2:[]}) )
1213 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1215 # and the bucket directory should now be gone
1216 si = base32.b2a("si1")
1217 # note: this is a detail of the storage server implementation, and
1218 # may change in the future
1220 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1221 bucketdir = os.path.join(prefixdir, si)
1222 self.failUnless(os.path.exists(prefixdir))
1223 self.failIf(os.path.exists(bucketdir))
1225 class Stats(unittest.TestCase):
1228 self.sparent = LoggingServiceParent()
1229 self._lease_secret = itertools.count()
1231 return self.sparent.stopService()
1233 def workdir(self, name):
1234 basedir = os.path.join("storage", "Server", name)
1237 def create(self, name):
1238 workdir = self.workdir(name)
1239 ss = StorageServer(workdir, "\x00" * 20)
1240 ss.setServiceParent(self.sparent)
1243 def test_latencies(self):
1244 ss = self.create("test_latencies")
1245 for i in range(10000):
1246 ss.add_latency("allocate", 1.0 * i)
1247 for i in range(1000):
1248 ss.add_latency("renew", 1.0 * i)
1250 ss.add_latency("cancel", 2.0 * i)
1251 ss.add_latency("get", 5.0)
1253 output = ss.get_latencies()
1255 self.failUnlessEqual(sorted(output.keys()),
1256 sorted(["allocate", "renew", "cancel", "get"]))
1257 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1258 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1259 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1260 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1261 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1262 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1263 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1264 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1265 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1267 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1268 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1269 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1)
1270 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1271 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1272 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1273 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1274 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1275 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1277 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1278 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1279 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1)
1280 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1)
1281 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1282 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1283 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1284 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1285 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1287 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1288 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1289 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1290 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1291 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1292 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1293 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1294 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1295 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1298 s = re.sub(r'<[^>]*>', ' ', s)
1299 s = re.sub(r'\s+', ' ', s)
1302 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1305 self.s = service.MultiService()
1306 self.s.startService()
1308 return self.s.stopService()
1310 def test_bucket_counter(self):
1311 basedir = "storage/BucketCounter/bucket_counter"
1312 fileutil.make_dirs(basedir)
1313 ss = StorageServer(basedir, "\x00" * 20)
1314 # to make sure we capture the bucket-counting-crawler in the middle
1315 # of a cycle, we reach in and reduce its maximum slice time to 0. We
1316 # also make it start sooner than usual.
1317 ss.bucket_counter.slow_start = 0
1318 orig_cpu_slice = ss.bucket_counter.cpu_slice
1319 ss.bucket_counter.cpu_slice = 0
1320 ss.setServiceParent(self.s)
1322 w = StorageStatus(ss)
1324 # this sample is before the crawler has started doing anything
1325 html = w.renderSynchronously()
1326 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1327 s = remove_tags(html)
1328 self.failUnless("Accepting new shares: Yes" in s, s)
1329 self.failUnless("Reserved space: - 0 B (0)" in s, s)
1330 self.failUnless("Total buckets: Not computed yet" in s, s)
1331 self.failUnless("Next crawl in" in s, s)
1333 # give the bucket-counting-crawler one tick to get started. The
1334 # cpu_slice=0 will force it to yield right after it processes the
1337 d = eventual.fireEventually()
1338 def _check(ignored):
1339 # are we really right after the first prefix?
1340 state = ss.bucket_counter.get_state()
1341 self.failUnlessEqual(state["last-complete-prefix"],
1342 ss.bucket_counter.prefixes[0])
1343 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1344 html = w.renderSynchronously()
1345 s = remove_tags(html)
1346 self.failUnless(" Current crawl " in s, s)
1347 self.failUnless(" (next work in " in s, s)
1348 d.addCallback(_check)
1350 # now give it enough time to complete a full cycle
1352 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1353 d.addCallback(lambda ignored: self.poll(_watch))
1354 def _check2(ignored):
1355 ss.bucket_counter.cpu_slice = orig_cpu_slice
1356 html = w.renderSynchronously()
1357 s = remove_tags(html)
1358 self.failUnless("Total buckets: 0 (the number of" in s, s)
1359 self.failUnless("Next crawl in 359" in s, s) # about 3600-1 seconds
1360 d.addCallback(_check2)
1363 def test_bucket_counter_cleanup(self):
1364 basedir = "storage/BucketCounter/bucket_counter_cleanup"
1365 fileutil.make_dirs(basedir)
1366 ss = StorageServer(basedir, "\x00" * 20)
1367 # to make sure we capture the bucket-counting-crawler in the middle
1368 # of a cycle, we reach in and reduce its maximum slice time to 0.
1369 ss.bucket_counter.slow_start = 0
1370 orig_cpu_slice = ss.bucket_counter.cpu_slice
1371 ss.bucket_counter.cpu_slice = 0
1372 ss.setServiceParent(self.s)
1374 d = eventual.fireEventually()
1376 def _after_first_prefix(ignored):
1377 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1378 # now sneak in and mess with its state, to make sure it cleans up
1379 # properly at the end of the cycle
1380 state = ss.bucket_counter.state
1381 self.failUnlessEqual(state["last-complete-prefix"],
1382 ss.bucket_counter.prefixes[0])
1383 state["bucket-counts"][-12] = {}
1384 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1385 ss.bucket_counter.save_state()
1386 d.addCallback(_after_first_prefix)
1388 # now give it enough time to complete a cycle
1390 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1391 d.addCallback(lambda ignored: self.poll(_watch))
1392 def _check2(ignored):
1393 ss.bucket_counter.cpu_slice = orig_cpu_slice
1394 s = ss.bucket_counter.get_state()
1395 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1396 self.failIf("bogusprefix!" in s["storage-index-samples"],
1397 s["storage-index-samples"].keys())
1398 d.addCallback(_check2)
1401 class NoStatvfsServer(StorageServer):
1402 def do_statvfs(self):
1403 raise AttributeError
1405 class WebStatus(unittest.TestCase, pollmixin.PollMixin):
1408 self.s = service.MultiService()
1409 self.s.startService()
1411 return self.s.stopService()
1413 def test_no_server(self):
1414 w = StorageStatus(None)
1415 html = w.renderSynchronously()
1416 self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
1418 def test_status(self):
1419 basedir = "storage/WebStatus/status"
1420 fileutil.make_dirs(basedir)
1421 ss = StorageServer(basedir, "\x00" * 20)
1422 ss.setServiceParent(self.s)
1423 w = StorageStatus(ss)
1424 html = w.renderSynchronously()
1425 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1426 s = remove_tags(html)
1427 self.failUnless("Accepting new shares: Yes" in s, s)
1428 self.failUnless("Reserved space: - 0 B (0)" in s, s)
1430 def test_status_no_statvfs(self):
1431 # windows has no os.statvfs . Make sure the code handles that even on
1433 basedir = "storage/WebStatus/status_no_statvfs"
1434 fileutil.make_dirs(basedir)
1435 ss = NoStatvfsServer(basedir, "\x00" * 20)
1436 ss.setServiceParent(self.s)
1437 w = StorageStatus(ss)
1438 html = w.renderSynchronously()
1439 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1440 s = remove_tags(html)
1441 self.failUnless("Accepting new shares: Yes" in s, s)
1442 self.failUnless("Total disk space: ?" in s, s)
1444 def test_readonly(self):
1445 basedir = "storage/WebStatus/readonly"
1446 fileutil.make_dirs(basedir)
1447 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
1448 ss.setServiceParent(self.s)
1449 w = StorageStatus(ss)
1450 html = w.renderSynchronously()
1451 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1452 s = remove_tags(html)
1453 self.failUnless("Accepting new shares: No" in s, s)
1455 def test_reserved(self):
1456 basedir = "storage/WebStatus/reserved"
1457 fileutil.make_dirs(basedir)
1458 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
1459 ss.setServiceParent(self.s)
1460 w = StorageStatus(ss)
1461 html = w.renderSynchronously()
1462 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1463 s = remove_tags(html)
1464 self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
1466 def test_huge_reserved(self):
1467 basedir = "storage/WebStatus/reserved"
1468 fileutil.make_dirs(basedir)
1469 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
1470 ss.setServiceParent(self.s)
1471 w = StorageStatus(ss)
1472 html = w.renderSynchronously()
1473 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1474 s = remove_tags(html)
1475 self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
1477 def test_util(self):
1478 w = StorageStatus(None)
1479 self.failUnlessEqual(w.render_space(None, None), "?")
1480 self.failUnlessEqual(w.render_space(None, 10e6), "10.00 MB (10000000)")
1481 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
1482 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)