2 import time, os.path, stat, re
4 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap import eventual
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin
12 from allmydata.storage.server import StorageServer, storage_index_to_dir
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError
16 from allmydata.storage.lease import LeaseInfo
17 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
19 from allmydata.interfaces import BadWriteEnablerError
20 from allmydata.test.common import LoggingServiceParent
21 from allmydata.web.storage import StorageStatus, remove_prefix
26 def __init__(self, ignore_disconnectors=False):
27 self.ignore = ignore_disconnectors
28 self.disconnectors = {}
29 def notifyOnDisconnect(self, f, *args, **kwargs):
33 self.disconnectors[m] = (f, args, kwargs)
35 def dontNotifyOnDisconnect(self, marker):
38 del self.disconnectors[marker]
40 class FakeStatsProvider:
41 def count(self, name, delta=1):
43 def register_producer(self, producer):
46 class Bucket(unittest.TestCase):
47 def make_workdir(self, name):
48 basedir = os.path.join("storage", "Bucket", name)
49 incoming = os.path.join(basedir, "tmp", "bucket")
50 final = os.path.join(basedir, "bucket")
51 fileutil.make_dirs(basedir)
52 fileutil.make_dirs(os.path.join(basedir, "tmp"))
53 return incoming, final
55 def bucket_writer_closed(self, bw, consumed):
57 def add_latency(self, category, latency):
59 def count(self, name, delta=1):
64 renew_secret = os.urandom(32)
65 cancel_secret = os.urandom(32)
66 expiration_time = time.time() + 5000
67 return LeaseInfo(owner_num, renew_secret, cancel_secret,
68 expiration_time, "\x00" * 20)
70 def test_create(self):
71 incoming, final = self.make_workdir("test_create")
72 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
74 bw.remote_write(0, "a"*25)
75 bw.remote_write(25, "b"*25)
76 bw.remote_write(50, "c"*25)
77 bw.remote_write(75, "d"*7)
80 def test_readwrite(self):
81 incoming, final = self.make_workdir("test_readwrite")
82 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
84 bw.remote_write(0, "a"*25)
85 bw.remote_write(25, "b"*25)
86 bw.remote_write(50, "c"*7) # last block may be short
90 br = BucketReader(self, bw.finalhome)
91 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
92 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
93 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
97 def callRemote(self, methname, *args, **kwargs):
99 meth = getattr(self.target, "remote_" + methname)
100 return meth(*args, **kwargs)
101 return defer.maybeDeferred(_call)
103 class BucketProxy(unittest.TestCase):
104 def make_bucket(self, name, size):
105 basedir = os.path.join("storage", "BucketProxy", name)
106 incoming = os.path.join(basedir, "tmp", "bucket")
107 final = os.path.join(basedir, "bucket")
108 fileutil.make_dirs(basedir)
109 fileutil.make_dirs(os.path.join(basedir, "tmp"))
110 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
116 def make_lease(self):
118 renew_secret = os.urandom(32)
119 cancel_secret = os.urandom(32)
120 expiration_time = time.time() + 5000
121 return LeaseInfo(owner_num, renew_secret, cancel_secret,
122 expiration_time, "\x00" * 20)
124 def bucket_writer_closed(self, bw, consumed):
126 def add_latency(self, category, latency):
128 def count(self, name, delta=1):
131 def test_create(self):
132 bw, rb, sharefname = self.make_bucket("test_create", 500)
133 bp = WriteBucketProxy(rb,
138 uri_extension_size_max=500, nodeid=None)
139 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
141 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
142 # Let's pretend each share has 100 bytes of data, and that there are
143 # 4 segments (25 bytes each), and 8 shares total. So the two
144 # per-segment merkle trees (crypttext_hash_tree,
145 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
146 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
147 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
148 # long. That should make the whole share:
150 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
151 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
153 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
155 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
157 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
159 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
161 uri_extension = "s" + "E"*498 + "e"
163 bw, rb, sharefname = self.make_bucket(name, sharesize)
169 uri_extension_size_max=len(uri_extension),
173 d.addCallback(lambda res: bp.put_block(0, "a"*25))
174 d.addCallback(lambda res: bp.put_block(1, "b"*25))
175 d.addCallback(lambda res: bp.put_block(2, "c"*25))
176 d.addCallback(lambda res: bp.put_block(3, "d"*20))
177 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
178 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
179 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
180 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
181 d.addCallback(lambda res: bp.close())
183 # now read everything back
184 def _start_reading(res):
185 br = BucketReader(self, sharefname)
188 rbp = rbp_class(rb, peerid="abc", storage_index="")
189 self.failUnless("to peer" in repr(rbp))
190 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
192 d1 = rbp.get_block_data(0, 25, 25)
193 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
194 d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
195 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
196 d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
197 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
198 d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
199 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
201 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
202 d1.addCallback(lambda res:
203 self.failUnlessEqual(res, crypttext_hashes))
204 d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
205 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
206 d1.addCallback(lambda res: rbp.get_share_hashes())
207 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
208 d1.addCallback(lambda res: rbp.get_uri_extension())
209 d1.addCallback(lambda res:
210 self.failUnlessEqual(res, uri_extension))
214 d.addCallback(_start_reading)
218 def test_readwrite_v1(self):
219 return self._do_test_readwrite("test_readwrite_v1",
220 0x24, WriteBucketProxy, ReadBucketProxy)
222 def test_readwrite_v2(self):
223 return self._do_test_readwrite("test_readwrite_v2",
224 0x44, WriteBucketProxy_v2, ReadBucketProxy)
226 class FakeDiskStorageServer(StorageServer):
227 def stat_disk(self, d):
228 return self.DISKAVAIL
230 class Server(unittest.TestCase):
233 self.sparent = LoggingServiceParent()
234 self._lease_secret = itertools.count()
236 return self.sparent.stopService()
238 def workdir(self, name):
239 basedir = os.path.join("storage", "Server", name)
242 def create(self, name, reserved_space=0, klass=StorageServer):
243 workdir = self.workdir(name)
244 ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
245 stats_provider=FakeStatsProvider())
246 ss.setServiceParent(self.sparent)
249 def test_create(self):
250 ss = self.create("test_create")
252 def allocate(self, ss, storage_index, sharenums, size, canary=None):
253 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
254 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
256 canary = FakeCanary()
257 return ss.remote_allocate_buckets(storage_index,
258 renew_secret, cancel_secret,
259 sharenums, size, canary)
261 def test_large_share(self):
262 ss = self.create("test_large_share")
264 already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
265 self.failUnlessEqual(already, set())
266 self.failUnlessEqual(set(writers.keys()), set([0]))
268 shnum, bucket = writers.items()[0]
269 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
270 bucket.remote_write(2**32, "ab")
271 bucket.remote_close()
273 readers = ss.remote_get_buckets("allocate")
274 reader = readers[shnum]
275 self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
276 test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
278 def test_dont_overfill_dirs(self):
280 This test asserts that if you add a second share whose storage index
281 share lots of leading bits with an extant share (but isn't the exact
282 same storage index), this won't add an entry to the share directory.
284 ss = self.create("test_dont_overfill_dirs")
285 already, writers = self.allocate(ss, "storageindex", [0], 10)
286 for i, wb in writers.items():
287 wb.remote_write(0, "%10d" % i)
289 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
291 children_of_storedir = set(os.listdir(storedir))
293 # Now store another one under another storageindex that has leading
294 # chars the same as the first storageindex.
295 already, writers = self.allocate(ss, "storageindey", [0], 10)
296 for i, wb in writers.items():
297 wb.remote_write(0, "%10d" % i)
299 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
301 new_children_of_storedir = set(os.listdir(storedir))
302 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
304 def test_remove_incoming(self):
305 ss = self.create("test_remove_incoming")
306 already, writers = self.allocate(ss, "vid", range(3), 10)
307 for i,wb in writers.items():
308 wb.remote_write(0, "%10d" % i)
310 incoming_share_dir = wb.incominghome
311 incoming_bucket_dir = os.path.dirname(incoming_share_dir)
312 incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
313 incoming_dir = os.path.dirname(incoming_prefix_dir)
314 self.failIf(os.path.exists(incoming_bucket_dir))
315 self.failIf(os.path.exists(incoming_prefix_dir))
316 self.failUnless(os.path.exists(incoming_dir))
318 def test_allocate(self):
319 ss = self.create("test_allocate")
321 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
323 canary = FakeCanary()
324 already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
325 self.failUnlessEqual(already, set())
326 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
328 # while the buckets are open, they should not count as readable
329 self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
332 for i,wb in writers.items():
333 wb.remote_write(0, "%25d" % i)
335 # aborting a bucket that was already closed is a no-op
338 # now they should be readable
339 b = ss.remote_get_buckets("allocate")
340 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
341 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
343 # now if we ask about writing again, the server should offer those
344 # three buckets as already present. It should offer them even if we
345 # don't ask about those specific ones.
346 already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
347 self.failUnlessEqual(already, set([0,1,2]))
348 self.failUnlessEqual(set(writers.keys()), set([3,4]))
350 # while those two buckets are open for writing, the server should
351 # refuse to offer them to uploaders
353 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
354 self.failUnlessEqual(already2, set([0,1,2]))
355 self.failUnlessEqual(set(writers2.keys()), set([5]))
357 # aborting the writes should remove the tempfiles
358 for i,wb in writers2.items():
360 already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
361 self.failUnlessEqual(already2, set([0,1,2]))
362 self.failUnlessEqual(set(writers2.keys()), set([5]))
364 for i,wb in writers2.items():
366 for i,wb in writers.items():
369 def test_disconnect(self):
370 # simulate a disconnection
371 ss = self.create("test_disconnect")
372 canary = FakeCanary()
373 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
374 self.failUnlessEqual(already, set())
375 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
376 for (f,args,kwargs) in canary.disconnectors.values():
381 # that ought to delete the incoming shares
382 already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
383 self.failUnlessEqual(already, set())
384 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
386 def test_reserved_space(self):
387 ss = self.create("test_reserved_space", reserved_space=10000,
388 klass=FakeDiskStorageServer)
389 # the FakeDiskStorageServer doesn't do real statvfs() calls
391 # 15k available, 10k reserved, leaves 5k for shares
393 # a newly created and filled share incurs this much overhead, beyond
394 # the size we request.
396 LEASE_SIZE = 4+32+32+4
397 canary = FakeCanary(True)
398 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
399 self.failUnlessEqual(len(writers), 3)
400 # now the StorageServer should have 3000 bytes provisionally
401 # allocated, allowing only 2000 more to be claimed
402 self.failUnlessEqual(len(ss._active_writers), 3)
404 # allocating 1001-byte shares only leaves room for one
405 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
406 self.failUnlessEqual(len(writers2), 1)
407 self.failUnlessEqual(len(ss._active_writers), 4)
409 # we abandon the first set, so their provisional allocation should be
413 self.failUnlessEqual(len(ss._active_writers), 1)
414 # now we have a provisional allocation of 1001 bytes
416 # and we close the second set, so their provisional allocation should
417 # become real, long-term allocation, and grows to include the
419 for bw in writers2.values():
420 bw.remote_write(0, "a"*25)
425 self.failUnlessEqual(len(ss._active_writers), 0)
427 allocated = 1001 + OVERHEAD + LEASE_SIZE
429 # we have to manually increase DISKAVAIL, since we're not doing real
431 ss.DISKAVAIL -= allocated
433 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
434 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
435 already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
436 self.failUnlessEqual(len(writers3), 39)
437 self.failUnlessEqual(len(ss._active_writers), 39)
441 self.failUnlessEqual(len(ss._active_writers), 0)
442 ss.disownServiceParent()
446 basedir = self.workdir("test_seek_behavior")
447 fileutil.make_dirs(basedir)
448 filename = os.path.join(basedir, "testfile")
449 f = open(filename, "wb")
452 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
453 # files. mode="a" preserves previous contents but does not allow
454 # seeking-to-create-holes. mode="r+" allows both.
455 f = open(filename, "rb+")
459 filelen = os.stat(filename)[stat.ST_SIZE]
460 self.failUnlessEqual(filelen, 100+3)
461 f2 = open(filename, "rb")
462 self.failUnlessEqual(f2.read(5), "start")
465 def test_leases(self):
466 ss = self.create("test_leases")
467 canary = FakeCanary()
471 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
472 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
473 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
474 sharenums, size, canary)
475 self.failUnlessEqual(len(already), 0)
476 self.failUnlessEqual(len(writers), 5)
477 for wb in writers.values():
480 leases = list(ss.get_leases("si0"))
481 self.failUnlessEqual(len(leases), 1)
482 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
484 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
485 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
486 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
487 sharenums, size, canary)
488 for wb in writers.values():
491 # take out a second lease on si1
492 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
493 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
494 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
495 sharenums, size, canary)
496 self.failUnlessEqual(len(already), 5)
497 self.failUnlessEqual(len(writers), 0)
499 leases = list(ss.get_leases("si1"))
500 self.failUnlessEqual(len(leases), 2)
501 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
503 # and a third lease, using add-lease
504 rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
505 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
506 ss.remote_add_lease("si1", rs2a, cs2a)
507 leases = list(ss.get_leases("si1"))
508 self.failUnlessEqual(len(leases), 3)
509 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
511 # add-lease on a missing storage index is silently ignored
512 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
514 # check that si0 is readable
515 readers = ss.remote_get_buckets("si0")
516 self.failUnlessEqual(len(readers), 5)
518 # renew the first lease. Only the proper renew_secret should work
519 ss.remote_renew_lease("si0", rs0)
520 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
521 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
523 # check that si0 is still readable
524 readers = ss.remote_get_buckets("si0")
525 self.failUnlessEqual(len(readers), 5)
528 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
529 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
530 ss.remote_cancel_lease("si0", cs0)
532 # si0 should now be gone
533 readers = ss.remote_get_buckets("si0")
534 self.failUnlessEqual(len(readers), 0)
535 # and the renew should no longer work
536 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
539 # cancel the first lease on si1, leaving the second and third in place
540 ss.remote_cancel_lease("si1", cs1)
541 readers = ss.remote_get_buckets("si1")
542 self.failUnlessEqual(len(readers), 5)
543 # the corresponding renew should no longer work
544 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
546 leases = list(ss.get_leases("si1"))
547 self.failUnlessEqual(len(leases), 2)
548 self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
550 ss.remote_renew_lease("si1", rs2)
551 # cancelling the second and third should make it go away
552 ss.remote_cancel_lease("si1", cs2)
553 ss.remote_cancel_lease("si1", cs2a)
554 readers = ss.remote_get_buckets("si1")
555 self.failUnlessEqual(len(readers), 0)
556 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
557 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
558 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
560 leases = list(ss.get_leases("si1"))
561 self.failUnlessEqual(len(leases), 0)
564 # test overlapping uploads
565 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
566 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
567 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
568 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
569 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
570 sharenums, size, canary)
571 self.failUnlessEqual(len(already), 0)
572 self.failUnlessEqual(len(writers), 5)
573 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
574 sharenums, size, canary)
575 self.failUnlessEqual(len(already2), 0)
576 self.failUnlessEqual(len(writers2), 0)
577 for wb in writers.values():
580 leases = list(ss.get_leases("si3"))
581 self.failUnlessEqual(len(leases), 1)
583 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
584 sharenums, size, canary)
585 self.failUnlessEqual(len(already3), 5)
586 self.failUnlessEqual(len(writers3), 0)
588 leases = list(ss.get_leases("si3"))
589 self.failUnlessEqual(len(leases), 2)
591 def test_readonly(self):
592 workdir = self.workdir("test_readonly")
593 ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
594 ss.setServiceParent(self.sparent)
596 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
597 self.failUnlessEqual(already, set())
598 self.failUnlessEqual(writers, {})
600 stats = ss.get_stats()
601 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
603 if "storage_server.disk_avail" in stats:
604 # windows does not have os.statvfs, so it doesn't give us disk
605 # stats. But if there are stats, readonly_storage means
607 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
609 def test_discard(self):
610 # discard is really only used for other tests, but we test it anyways
611 workdir = self.workdir("test_discard")
612 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
613 ss.setServiceParent(self.sparent)
615 canary = FakeCanary()
616 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
617 self.failUnlessEqual(already, set())
618 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
619 for i,wb in writers.items():
620 wb.remote_write(0, "%25d" % i)
622 # since we discard the data, the shares should be present but sparse.
623 # Since we write with some seeks, the data we read back will be all
625 b = ss.remote_get_buckets("vid")
626 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
627 self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
629 def test_advise_corruption(self):
630 workdir = self.workdir("test_advise_corruption")
631 ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
632 ss.setServiceParent(self.sparent)
634 si0_s = base32.b2a("si0")
635 ss.remote_advise_corrupt_share("immutable", "si0", 0,
636 "This share smells funny.\n")
637 reportdir = os.path.join(workdir, "corruption-advisories")
638 reports = os.listdir(reportdir)
639 self.failUnlessEqual(len(reports), 1)
640 report_si0 = reports[0]
641 self.failUnless(si0_s in report_si0, report_si0)
642 f = open(os.path.join(reportdir, report_si0), "r")
645 self.failUnless("type: immutable" in report)
646 self.failUnless(("storage_index: %s" % si0_s) in report)
647 self.failUnless("share_number: 0" in report)
648 self.failUnless("This share smells funny." in report)
650 # test the RIBucketWriter version too
651 si1_s = base32.b2a("si1")
652 already,writers = self.allocate(ss, "si1", [1], 75)
653 self.failUnlessEqual(already, set())
654 self.failUnlessEqual(set(writers.keys()), set([1]))
655 writers[1].remote_write(0, "data")
656 writers[1].remote_close()
658 b = ss.remote_get_buckets("si1")
659 self.failUnlessEqual(set(b.keys()), set([1]))
660 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
662 reports = os.listdir(reportdir)
663 self.failUnlessEqual(len(reports), 2)
664 report_si1 = [r for r in reports if si1_s in r][0]
665 f = open(os.path.join(reportdir, report_si1), "r")
668 self.failUnless("type: immutable" in report)
669 self.failUnless(("storage_index: %s" % si1_s) in report)
670 self.failUnless("share_number: 1" in report)
671 self.failUnless("This share tastes like dust." in report)
675 class MutableServer(unittest.TestCase):
678 self.sparent = LoggingServiceParent()
679 self._lease_secret = itertools.count()
681 return self.sparent.stopService()
683 def workdir(self, name):
684 basedir = os.path.join("storage", "MutableServer", name)
687 def create(self, name):
688 workdir = self.workdir(name)
689 ss = StorageServer(workdir, "\x00" * 20)
690 ss.setServiceParent(self.sparent)
693 def test_create(self):
694 ss = self.create("test_create")
696 def write_enabler(self, we_tag):
697 return hashutil.tagged_hash("we_blah", we_tag)
699 def renew_secret(self, tag):
700 return hashutil.tagged_hash("renew_blah", str(tag))
702 def cancel_secret(self, tag):
703 return hashutil.tagged_hash("cancel_blah", str(tag))
705 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
706 write_enabler = self.write_enabler(we_tag)
707 renew_secret = self.renew_secret(lease_tag)
708 cancel_secret = self.cancel_secret(lease_tag)
709 rstaraw = ss.remote_slot_testv_and_readv_and_writev
710 testandwritev = dict( [ (shnum, ([], [], None) )
711 for shnum in sharenums ] )
713 rc = rstaraw(storage_index,
714 (write_enabler, renew_secret, cancel_secret),
717 (did_write, readv_data) = rc
718 self.failUnless(did_write)
719 self.failUnless(isinstance(readv_data, dict))
720 self.failUnlessEqual(len(readv_data), 0)
722 def test_container_size(self):
723 ss = self.create("test_container_size")
724 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
726 rstaraw = ss.remote_slot_testv_and_readv_and_writev
727 secrets = ( self.write_enabler("we1"),
728 self.renew_secret("we1"),
729 self.cancel_secret("we1") )
730 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
731 answer = rstaraw("si1", secrets,
732 {0: ([], [(0,data)], len(data)+12)},
734 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
736 # trying to make the container too large will raise an exception
737 TOOBIG = MutableShareFile.MAX_SIZE + 10
738 self.failUnlessRaises(DataTooLargeError,
739 rstaraw, "si1", secrets,
740 {0: ([], [(0,data)], TOOBIG)},
743 # it should be possible to make the container smaller, although at
744 # the moment this doesn't actually affect the share
745 answer = rstaraw("si1", secrets,
746 {0: ([], [(0,data)], len(data)+8)},
748 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
750 def test_allocate(self):
751 ss = self.create("test_allocate")
752 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
755 read = ss.remote_slot_readv
756 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
758 self.failUnlessEqual(read("si1", [], [(0, 10)]),
759 {0: [""], 1: [""], 2: [""]})
760 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
764 secrets = ( self.write_enabler("we1"),
765 self.renew_secret("we1"),
766 self.cancel_secret("we1") )
767 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
768 write = ss.remote_slot_testv_and_readv_and_writev
769 answer = write("si1", secrets,
770 {0: ([], [(0,data)], None)},
772 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
774 self.failUnlessEqual(read("si1", [0], [(0,20)]),
775 {0: ["00000000001111111111"]})
776 self.failUnlessEqual(read("si1", [0], [(95,10)]),
778 #self.failUnlessEqual(s0.remote_get_length(), 100)
780 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
781 f = self.failUnlessRaises(BadWriteEnablerError,
782 write, "si1", bad_secrets,
784 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
786 # this testv should fail
787 answer = write("si1", secrets,
788 {0: ([(0, 12, "eq", "444444444444"),
789 (20, 5, "eq", "22222"),
796 self.failUnlessEqual(answer, (False,
797 {0: ["000000000011", "22222"],
801 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
804 answer = write("si1", secrets,
805 {0: ([(10, 5, "lt", "11111"),
812 self.failUnlessEqual(answer, (False,
817 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
820 def test_operators(self):
821 # test operators, the data we're comparing is '11111' in all cases.
822 # test both fail+pass, reset data after each one.
823 ss = self.create("test_operators")
825 secrets = ( self.write_enabler("we1"),
826 self.renew_secret("we1"),
827 self.cancel_secret("we1") )
828 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
829 write = ss.remote_slot_testv_and_readv_and_writev
830 read = ss.remote_slot_readv
833 write("si1", secrets,
834 {0: ([], [(0,data)], None)},
840 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
845 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
846 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
847 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
850 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
855 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
856 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
859 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
864 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
865 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
869 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
874 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
875 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
878 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
883 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
884 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
887 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
892 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
893 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
897 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
902 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
903 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
906 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
911 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
912 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
916 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
921 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
922 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
925 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
930 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
931 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
935 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
940 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
941 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
944 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
949 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
950 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
953 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
958 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
959 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
963 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
968 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
969 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
972 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
977 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
978 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
981 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
986 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
987 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
990 # finally, test some operators against empty shares
991 answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
996 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
997 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1000 def test_readv(self):
1001 ss = self.create("test_readv")
1002 secrets = ( self.write_enabler("we1"),
1003 self.renew_secret("we1"),
1004 self.cancel_secret("we1") )
1005 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1006 write = ss.remote_slot_testv_and_readv_and_writev
1007 read = ss.remote_slot_readv
1008 data = [("%d" % i) * 100 for i in range(3)]
1009 rc = write("si1", secrets,
1010 {0: ([], [(0,data[0])], None),
1011 1: ([], [(0,data[1])], None),
1012 2: ([], [(0,data[2])], None),
1014 self.failUnlessEqual(rc, (True, {}))
1016 answer = read("si1", [], [(0, 10)])
1017 self.failUnlessEqual(answer, {0: ["0"*10],
1021 def compare_leases_without_timestamps(self, leases_a, leases_b):
1022 self.failUnlessEqual(len(leases_a), len(leases_b))
1023 for i in range(len(leases_a)):
1024 num_a, a = leases_a[i]
1025 num_b, b = leases_b[i]
1026 self.failUnlessEqual(num_a, num_b)
1027 self.failUnlessEqual(a.owner_num, b.owner_num)
1028 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1029 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1030 self.failUnlessEqual(a.nodeid, b.nodeid)
1032 def compare_leases(self, leases_a, leases_b):
1033 self.failUnlessEqual(len(leases_a), len(leases_b))
1034 for i in range(len(leases_a)):
1035 num_a, a = leases_a[i]
1036 num_b, b = leases_b[i]
1037 self.failUnlessEqual(num_a, num_b)
1038 self.failUnlessEqual(a.owner_num, b.owner_num)
1039 self.failUnlessEqual(a.renew_secret, b.renew_secret)
1040 self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1041 self.failUnlessEqual(a.nodeid, b.nodeid)
1042 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1044 def test_leases(self):
1045 ss = self.create("test_leases")
1047 return ( self.write_enabler("we1"),
1048 self.renew_secret("we1-%d" % n),
1049 self.cancel_secret("we1-%d" % n) )
1050 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1051 write = ss.remote_slot_testv_and_readv_and_writev
1052 read = ss.remote_slot_readv
1053 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1054 self.failUnlessEqual(rc, (True, {}))
1056 # create a random non-numeric file in the bucket directory, to
1057 # exercise the code that's supposed to ignore those.
1058 bucket_dir = os.path.join(self.workdir("test_leases"),
1059 "shares", storage_index_to_dir("si1"))
1060 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1061 f.write("you ought to be ignoring me\n")
1064 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1065 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1067 # add-lease on a missing storage index is silently ignored
1068 self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1070 # re-allocate the slots and use the same secrets, that should update
1072 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1073 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1076 ss.remote_renew_lease("si1", secrets(0)[1])
1077 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1079 # now allocate them with a bunch of different secrets, to trigger the
1080 # extended lease code. Use add_lease for one of them.
1081 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1082 self.failUnlessEqual(len(s0.debug_get_leases()), 2)
1083 secrets2 = secrets(2)
1084 ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1085 self.failUnlessEqual(len(s0.debug_get_leases()), 3)
1086 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1087 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1088 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1090 self.failUnlessEqual(len(s0.debug_get_leases()), 6)
1092 # cancel one of them
1093 ss.remote_cancel_lease("si1", secrets(5)[2])
1094 self.failUnlessEqual(len(s0.debug_get_leases()), 5)
1096 all_leases = s0.debug_get_leases()
1097 # and write enough data to expand the container, forcing the server
1098 # to move the leases
1099 write("si1", secrets(0),
1100 {0: ([], [(0,data)], 200), },
1103 # read back the leases, make sure they're still intact.
1104 self.compare_leases_without_timestamps(all_leases,
1105 s0.debug_get_leases())
1107 ss.remote_renew_lease("si1", secrets(0)[1])
1108 ss.remote_renew_lease("si1", secrets(1)[1])
1109 ss.remote_renew_lease("si1", secrets(2)[1])
1110 ss.remote_renew_lease("si1", secrets(3)[1])
1111 ss.remote_renew_lease("si1", secrets(4)[1])
1112 self.compare_leases_without_timestamps(all_leases,
1113 s0.debug_get_leases())
1114 # get a new copy of the leases, with the current timestamps. Reading
1115 # data and failing to renew/cancel leases should leave the timestamps
1117 all_leases = s0.debug_get_leases()
1118 # renewing with a bogus token should prompt an error message
1120 # examine the exception thus raised, make sure the old nodeid is
1121 # present, to provide for share migration
1122 e = self.failUnlessRaises(IndexError,
1123 ss.remote_renew_lease, "si1",
1126 self.failUnless("Unable to renew non-existent lease" in e_s)
1127 self.failUnless("I have leases accepted by nodeids:" in e_s)
1128 self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1130 # same for cancelling
1131 self.failUnlessRaises(IndexError,
1132 ss.remote_cancel_lease, "si1",
1134 self.compare_leases(all_leases, s0.debug_get_leases())
1136 # reading shares should not modify the timestamp
1137 read("si1", [], [(0,200)])
1138 self.compare_leases(all_leases, s0.debug_get_leases())
1140 write("si1", secrets(0),
1141 {0: ([], [(200, "make me bigger")], None)}, [])
1142 self.compare_leases_without_timestamps(all_leases,
1143 s0.debug_get_leases())
1145 write("si1", secrets(0),
1146 {0: ([], [(500, "make me really bigger")], None)}, [])
1147 self.compare_leases_without_timestamps(all_leases,
1148 s0.debug_get_leases())
1150 # now cancel them all
1151 ss.remote_cancel_lease("si1", secrets(0)[2])
1152 ss.remote_cancel_lease("si1", secrets(1)[2])
1153 ss.remote_cancel_lease("si1", secrets(2)[2])
1154 ss.remote_cancel_lease("si1", secrets(3)[2])
1156 # the slot should still be there
1157 remaining_shares = read("si1", [], [(0,10)])
1158 self.failUnlessEqual(len(remaining_shares), 1)
1159 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1161 # cancelling a non-existent lease should raise an IndexError
1162 self.failUnlessRaises(IndexError,
1163 ss.remote_cancel_lease, "si1", "nonsecret")
1165 # and the slot should still be there
1166 remaining_shares = read("si1", [], [(0,10)])
1167 self.failUnlessEqual(len(remaining_shares), 1)
1168 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1170 ss.remote_cancel_lease("si1", secrets(4)[2])
1171 # now the slot should be gone
1172 no_shares = read("si1", [], [(0,10)])
1173 self.failUnlessEqual(no_shares, {})
1175 # cancelling a lease on a non-existent share should raise an IndexError
1176 self.failUnlessRaises(IndexError,
1177 ss.remote_cancel_lease, "si2", "nonsecret")
1179 def test_remove(self):
1180 ss = self.create("test_remove")
1181 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1183 readv = ss.remote_slot_readv
1184 writev = ss.remote_slot_testv_and_readv_and_writev
1185 secrets = ( self.write_enabler("we1"),
1186 self.renew_secret("we1"),
1187 self.cancel_secret("we1") )
1188 # delete sh0 by setting its size to zero
1189 answer = writev("si1", secrets,
1192 # the answer should mention all the shares that existed before the
1194 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1195 # but a new read should show only sh1 and sh2
1196 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1199 # delete sh1 by setting its size to zero
1200 answer = writev("si1", secrets,
1203 self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1204 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1207 # delete sh2 by setting its size to zero
1208 answer = writev("si1", secrets,
1211 self.failUnlessEqual(answer, (True, {2:[]}) )
1212 self.failUnlessEqual(readv("si1", [], [(0,10)]),
1214 # and the bucket directory should now be gone
1215 si = base32.b2a("si1")
1216 # note: this is a detail of the storage server implementation, and
1217 # may change in the future
1219 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1220 bucketdir = os.path.join(prefixdir, si)
1221 self.failUnless(os.path.exists(prefixdir))
1222 self.failIf(os.path.exists(bucketdir))
1224 class Stats(unittest.TestCase):
1227 self.sparent = LoggingServiceParent()
1228 self._lease_secret = itertools.count()
1230 return self.sparent.stopService()
1232 def workdir(self, name):
1233 basedir = os.path.join("storage", "Server", name)
1236 def create(self, name):
1237 workdir = self.workdir(name)
1238 ss = StorageServer(workdir, "\x00" * 20)
1239 ss.setServiceParent(self.sparent)
1242 def test_latencies(self):
1243 ss = self.create("test_latencies")
1244 for i in range(10000):
1245 ss.add_latency("allocate", 1.0 * i)
1246 for i in range(1000):
1247 ss.add_latency("renew", 1.0 * i)
1249 ss.add_latency("cancel", 2.0 * i)
1250 ss.add_latency("get", 5.0)
1252 output = ss.get_latencies()
1254 self.failUnlessEqual(sorted(output.keys()),
1255 sorted(["allocate", "renew", "cancel", "get"]))
1256 self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1257 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1258 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1259 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1260 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1261 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1262 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1263 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1264 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1266 self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1267 self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1268 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1)
1269 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1270 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1271 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1272 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1273 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1274 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1276 self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1277 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1278 self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1)
1279 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1)
1280 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1281 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1282 self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1283 self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1284 self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1286 self.failUnlessEqual(len(ss.latencies["get"]), 1)
1287 self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1288 self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1289 self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1290 self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1291 self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1292 self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1293 self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1294 self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1297 s = re.sub(r'<[^>]*>', ' ', s)
1298 s = re.sub(r'\s+', ' ', s)
1301 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1304 self.s = service.MultiService()
1305 self.s.startService()
1307 return self.s.stopService()
1309 def test_bucket_counter(self):
1310 basedir = "storage/BucketCounter/bucket_counter"
1311 fileutil.make_dirs(basedir)
1312 ss = StorageServer(basedir, "\x00" * 20)
1313 # to make sure we capture the bucket-counting-crawler in the middle
1314 # of a cycle, we reach in and reduce its maximum slice time to 0.
1315 orig_cpu_slice = ss.bucket_counter.cpu_slice
1316 ss.bucket_counter.cpu_slice = 0
1317 ss.setServiceParent(self.s)
1319 w = StorageStatus(ss)
1321 # this sample is before the crawler has started doing anything
1322 html = w.renderSynchronously()
1323 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1324 s = remove_tags(html)
1325 self.failUnless("Accepting new shares: Yes" in s, s)
1326 self.failUnless("Reserved space: - 0 B (0)" in s, s)
1327 self.failUnless("Total buckets: Not computed yet" in s, s)
1328 self.failUnless("Next crawl in" in s, s)
1330 # give the bucket-counting-crawler one tick to get started. The
1331 # cpu_slice=0 will force it to yield right after it processes the
1334 d = eventual.fireEventually()
1335 def _check(ignored):
1336 # are we really right after the first prefix?
1337 state = ss.bucket_counter.get_state()
1338 self.failUnlessEqual(state["last-complete-prefix"],
1339 ss.bucket_counter.prefixes[0])
1340 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1341 html = w.renderSynchronously()
1342 s = remove_tags(html)
1343 self.failUnless(" Current crawl " in s, s)
1344 self.failUnless(" (next work in " in s, s)
1345 d.addCallback(_check)
1347 # now give it enough time to complete a full cycle
1349 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1350 d.addCallback(lambda ignored: self.poll(_watch))
1351 def _check2(ignored):
1352 ss.bucket_counter.cpu_slice = orig_cpu_slice
1353 html = w.renderSynchronously()
1354 s = remove_tags(html)
1355 self.failUnless("Total buckets: 0 (the number of" in s, s)
1356 self.failUnless("Next crawl in 359" in s, s) # about 3600-1 seconds
1357 d.addCallback(_check2)
1360 def test_bucket_counter_cleanup(self):
1361 basedir = "storage/BucketCounter/bucket_counter_cleanup"
1362 fileutil.make_dirs(basedir)
1363 ss = StorageServer(basedir, "\x00" * 20)
1364 # to make sure we capture the bucket-counting-crawler in the middle
1365 # of a cycle, we reach in and reduce its maximum slice time to 0.
1366 orig_cpu_slice = ss.bucket_counter.cpu_slice
1367 ss.bucket_counter.cpu_slice = 0
1368 ss.setServiceParent(self.s)
1370 d = eventual.fireEventually()
1372 def _after_first_prefix(ignored):
1373 ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1374 # now sneak in and mess with its state, to make sure it cleans up
1375 # properly at the end of the cycle
1376 state = ss.bucket_counter.state
1377 self.failUnlessEqual(state["last-complete-prefix"],
1378 ss.bucket_counter.prefixes[0])
1379 state["share-counts"][-12] = {}
1380 state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1381 ss.bucket_counter.save_state()
1382 d.addCallback(_after_first_prefix)
1384 # now give it enough time to complete a cycle
1386 return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1387 d.addCallback(lambda ignored: self.poll(_watch))
1388 def _check2(ignored):
1389 ss.bucket_counter.cpu_slice = orig_cpu_slice
1390 s = ss.bucket_counter.get_state()
1391 self.failIf(-12 in s["share-counts"], s["share-counts"].keys())
1392 self.failIf("bogusprefix!" in s["storage-index-samples"],
1393 s["storage-index-samples"].keys())
1394 d.addCallback(_check2)
1397 class NoStatvfsServer(StorageServer):
1398 def do_statvfs(self):
1399 raise AttributeError
1401 class WebStatus(unittest.TestCase, pollmixin.PollMixin):
1404 self.s = service.MultiService()
1405 self.s.startService()
1407 return self.s.stopService()
1409 def test_no_server(self):
1410 w = StorageStatus(None)
1411 html = w.renderSynchronously()
1412 self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
1414 def test_status(self):
1415 basedir = "storage/WebStatus/status"
1416 fileutil.make_dirs(basedir)
1417 ss = StorageServer(basedir, "\x00" * 20)
1418 ss.setServiceParent(self.s)
1419 w = StorageStatus(ss)
1420 html = w.renderSynchronously()
1421 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1422 s = remove_tags(html)
1423 self.failUnless("Accepting new shares: Yes" in s, s)
1424 self.failUnless("Reserved space: - 0 B (0)" in s, s)
1426 def test_status_no_statvfs(self):
1427 # windows has no os.statvfs . Make sure the code handles that even on
1429 basedir = "storage/WebStatus/status_no_statvfs"
1430 fileutil.make_dirs(basedir)
1431 ss = NoStatvfsServer(basedir, "\x00" * 20)
1432 ss.setServiceParent(self.s)
1433 w = StorageStatus(ss)
1434 html = w.renderSynchronously()
1435 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1436 s = remove_tags(html)
1437 self.failUnless("Accepting new shares: Yes" in s, s)
1438 self.failUnless("Total disk space: ?" in s, s)
1440 def test_readonly(self):
1441 basedir = "storage/WebStatus/readonly"
1442 fileutil.make_dirs(basedir)
1443 ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
1444 ss.setServiceParent(self.s)
1445 w = StorageStatus(ss)
1446 html = w.renderSynchronously()
1447 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1448 s = remove_tags(html)
1449 self.failUnless("Accepting new shares: No" in s, s)
1451 def test_reserved(self):
1452 basedir = "storage/WebStatus/reserved"
1453 fileutil.make_dirs(basedir)
1454 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
1455 ss.setServiceParent(self.s)
1456 w = StorageStatus(ss)
1457 html = w.renderSynchronously()
1458 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1459 s = remove_tags(html)
1460 self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
1462 def test_huge_reserved(self):
1463 basedir = "storage/WebStatus/reserved"
1464 fileutil.make_dirs(basedir)
1465 ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
1466 ss.setServiceParent(self.s)
1467 w = StorageStatus(ss)
1468 html = w.renderSynchronously()
1469 self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1470 s = remove_tags(html)
1471 self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
1473 def test_util(self):
1474 w = StorageStatus(None)
1475 self.failUnlessEqual(w.render_space(None, None), "?")
1476 self.failUnlessEqual(w.render_space(None, 10e6), "10.00 MB (10000000)")
1477 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
1478 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)