From 6b7ff02e36c8fc9c308fc2e385ed90d94097742b Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Mon, 16 Jun 2008 15:21:55 -0700 Subject: [PATCH] storage: measure latency-per-operation, calculate mean/median/percentiles --- src/allmydata/storage.py | 69 +++++++++++++++++++++++++++-- src/allmydata/test/test_storage.py | 71 +++++++++++++++++++++++++++++- 2 files changed, 135 insertions(+), 5 deletions(-) diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py index df2589da..5a14a5f8 100644 --- a/src/allmydata/storage.py +++ b/src/allmydata/storage.py @@ -203,13 +203,16 @@ class BucketWriter(Referenceable): return self._size def remote_write(self, offset, data): + start = time.time() precondition(not self.closed) if self.throw_out_all_data: return self._sharefile.write_share_data(offset, data) + self.ss.add_latency("write", time.time() - start) def remote_close(self): precondition(not self.closed) + start = time.time() fileutil.make_dirs(os.path.dirname(self.finalhome)) fileutil.rename(self.incominghome, self.finalhome) @@ -225,6 +228,7 @@ class BucketWriter(Referenceable): filelen = os.stat(self.finalhome)[stat.ST_SIZE] self.ss.bucket_writer_closed(self, filelen) + self.ss.add_latency("close", time.time() - start) def _disconnected(self): if not self.closed: @@ -252,11 +256,15 @@ class BucketWriter(Referenceable): class BucketReader(Referenceable): implements(RIBucketReader) - def __init__(self, sharefname): + def __init__(self, ss, sharefname): + self.ss = ss self._share_file = ShareFile(sharefname) def remote_read(self, offset, length): - return self._share_file.read_share_data(offset, length) + start = time.time() + data = self._share_file.read_share_data(offset, length) + self.ss.add_latency("read", time.time() - start) + return data # the MutableShareFile is like the ShareFile, but used for mutable data. It @@ -704,6 +712,47 @@ class StorageServer(service.MultiService, Referenceable): log.msg(format="space measurement done, consumed=%(consumed)d bytes", consumed=self.consumed, parent=lp, facility="tahoe.storage") + self.latencies = {"allocate": [], # immutable + "write": [], + "close": [], + "read": [], + "renew": [], + "cancel": [], + "get": [], + "writev": [], # mutable + "readv": [], + } + + def add_latency(self, category, latency): + a = self.latencies[category] + a.append(latency) + if len(a) > 1000: + self.latencies[category] = a[-1000:] + + def get_latencies(self): + """Return a dict, indexed by category, that contains a dict of + latency numbers for each category. Each dict will contain the + following keys: mean, median, 90_percentile, 95_percentile, + 99_percentile). If no samples have been collected for the given + category, then that category name will not be present in the return + value.""" + # note that Amazon's Dynamo paper says they use 99.9% percentile. + output = {} + for category in self.latencies: + if not self.latencies[category]: + continue + stats = {} + samples = self.latencies[category][:] + samples.sort() + count = len(samples) + stats["mean"] = sum(samples) / count + stats["median"] = samples[int(0.5 * count)] + stats["90_percentile"] = samples[int(0.9 * count)] + stats["95_percentile"] = samples[int(0.95 * count)] + stats["99_percentile"] = samples[int(0.99 * count)] + stats["999_percentile"] = samples[int(0.999 * count)] + output[category] = stats + return output def log(self, *args, **kwargs): if "facility" not in kwargs: @@ -747,6 +796,7 @@ class StorageServer(service.MultiService, Referenceable): # owner_num is not for clients to set, but rather it should be # curried into the PersonalStorageServer instance that is dedicated # to a particular owner. + start = time.time() alreadygot = set() bucketwriters = {} # k: shnum, v: BucketWriter si_dir = storage_index_to_dir(storage_index) @@ -778,6 +828,7 @@ class StorageServer(service.MultiService, Referenceable): if self.readonly_storage: # we won't accept new shares + self.add_latency("allocate", time.time() - start) return alreadygot, bucketwriters for shnum in sharenums: @@ -809,9 +860,11 @@ class StorageServer(service.MultiService, Referenceable): if bucketwriters: fileutil.make_dirs(os.path.join(self.sharedir, si_dir)) + self.add_latency("allocate", time.time() - start) return alreadygot, bucketwriters def remote_renew_lease(self, storage_index, renew_secret): + start = time.time() new_expire_time = time.time() + 31*24*60*60 found_buckets = False for shnum, filename in self._get_bucket_shares(storage_index): @@ -829,10 +882,12 @@ class StorageServer(service.MultiService, Referenceable): else: pass # non-sharefile sf.renew_lease(renew_secret, new_expire_time) + self.add_latency("renew", time.time() - start) if not found_buckets: raise IndexError("no such lease to renew") def remote_cancel_lease(self, storage_index, cancel_secret): + start = time.time() storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index)) remaining_files = 0 @@ -873,6 +928,7 @@ class StorageServer(service.MultiService, Referenceable): self.consumed -= total_space_freed if self.stats_provider: self.stats_provider.count('storage_server.bytes_freed', total_space_freed) + self.add_latency("cancel", time.time() - start) if not found_buckets: raise IndexError("no such lease to cancel") @@ -908,11 +964,13 @@ class StorageServer(service.MultiService, Referenceable): pass def remote_get_buckets(self, storage_index): + start = time.time() si_s = si_b2a(storage_index) log.msg("storage: get_buckets %s" % si_s) bucketreaders = {} # k: sharenum, v: BucketReader for shnum, filename in self._get_bucket_shares(storage_index): - bucketreaders[shnum] = BucketReader(filename) + bucketreaders[shnum] = BucketReader(self, filename) + self.add_latency("get", time.time() - start) return bucketreaders def get_leases(self, storage_index): @@ -936,6 +994,7 @@ class StorageServer(service.MultiService, Referenceable): secrets, test_and_write_vectors, read_vector): + start = time.time() si_s = si_b2a(storage_index) lp = log.msg("storage: slot_writev %s" % si_s) si_dir = storage_index_to_dir(storage_index) @@ -1002,6 +1061,7 @@ class StorageServer(service.MultiService, Referenceable): share.add_or_renew_lease(lease_info) # all done + self.add_latency("writev", time.time() - start) return (testv_is_good, read_data) def _allocate_slot_share(self, bucketdir, secrets, sharenum, @@ -1015,6 +1075,7 @@ class StorageServer(service.MultiService, Referenceable): return share def remote_slot_readv(self, storage_index, shares, readv): + start = time.time() si_s = si_b2a(storage_index) lp = log.msg("storage: slot_readv %s %s" % (si_s, shares), facility="tahoe.storage", level=log.OPERATIONAL) @@ -1022,6 +1083,7 @@ class StorageServer(service.MultiService, Referenceable): # shares exist if there is a file for them bucketdir = os.path.join(self.sharedir, si_dir) if not os.path.isdir(bucketdir): + self.add_latency("readv", time.time() - start) return {} datavs = {} for sharenum_s in os.listdir(bucketdir): @@ -1035,6 +1097,7 @@ class StorageServer(service.MultiService, Referenceable): datavs[sharenum] = msf.readv(readv) log.msg("returning shares %s" % (datavs.keys(),), facility="tahoe.storage", level=log.NOISY, parent=lp) + self.add_latency("readv", time.time() - start) return datavs diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 26f64a86..64cf7e2e 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -29,6 +29,8 @@ class Bucket(unittest.TestCase): def bucket_writer_closed(self, bw, consumed): pass + def add_latency(self, category, latency): + pass def make_lease(self): owner_num = 0 @@ -57,7 +59,7 @@ class Bucket(unittest.TestCase): bw.remote_close() # now read from it - br = BucketReader(bw.finalhome) + br = BucketReader(self, bw.finalhome) self.failUnlessEqual(br.remote_read(0, 25), "a"*25) self.failUnlessEqual(br.remote_read(25, 25), "b"*25) self.failUnlessEqual(br.remote_read(50, 7), "c"*7) @@ -92,6 +94,8 @@ class BucketProxy(unittest.TestCase): def bucket_writer_closed(self, bw, consumed): pass + def add_latency(self, category, latency): + pass def test_create(self): bw, rb, sharefname = self.make_bucket("test_create", 500) @@ -147,7 +151,7 @@ class BucketProxy(unittest.TestCase): # now read everything back def _start_reading(res): - br = BucketReader(sharefname) + br = BucketReader(self, sharefname) rb = RemoteBucket() rb.target = br rbp = ReadBucketProxy(rb) @@ -910,3 +914,66 @@ class MutableServer(unittest.TestCase): no_shares = read("si1", [], [(0,10)]) self.failUnlessEqual(no_shares, {}) +class Stats(unittest.TestCase): + + def setUp(self): + self.sparent = LoggingServiceParent() + self._lease_secret = itertools.count() + def tearDown(self): + return self.sparent.stopService() + + def workdir(self, name): + basedir = os.path.join("storage", "Server", name) + return basedir + + def create(self, name, sizelimit=None): + workdir = self.workdir(name) + ss = StorageServer(workdir, sizelimit) + ss.setServiceParent(self.sparent) + return ss + + def test_latencies(self): + ss = self.create("test_latencies") + for i in range(10000): + ss.add_latency("allocate", 1.0 * i) + for i in range(1000): + ss.add_latency("renew", 1.0 * i) + for i in range(10): + ss.add_latency("cancel", 2.0 * i) + ss.add_latency("get", 5.0) + + output = ss.get_latencies() + + self.failUnlessEqual(sorted(output.keys()), + sorted(["allocate", "renew", "cancel", "get"])) + self.failUnlessEqual(len(ss.latencies["allocate"]), 1000) + self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1) + self.failUnless(abs(output["allocate"]["median"] - 9500) < 1) + self.failUnless(abs(output["allocate"]["90_percentile"] - 9900) < 1) + self.failUnless(abs(output["allocate"]["95_percentile"] - 9950) < 1) + self.failUnless(abs(output["allocate"]["99_percentile"] - 9990) < 1) + self.failUnless(abs(output["allocate"]["999_percentile"] - 9999) < 1) + + self.failUnlessEqual(len(ss.latencies["renew"]), 1000) + self.failUnless(abs(output["renew"]["mean"] - 500) < 1) + self.failUnless(abs(output["renew"]["median"] - 500) < 1) + self.failUnless(abs(output["renew"]["90_percentile"] - 900) < 1) + self.failUnless(abs(output["renew"]["95_percentile"] - 950) < 1) + self.failUnless(abs(output["renew"]["99_percentile"] - 990) < 1) + self.failUnless(abs(output["renew"]["999_percentile"] - 999) < 1) + + self.failUnlessEqual(len(ss.latencies["cancel"]), 10) + self.failUnless(abs(output["cancel"]["mean"] - 9) < 1) + self.failUnless(abs(output["cancel"]["median"] - 10) < 1) + self.failUnless(abs(output["cancel"]["90_percentile"] - 18) < 1) + self.failUnless(abs(output["cancel"]["95_percentile"] - 18) < 1) + self.failUnless(abs(output["cancel"]["99_percentile"] - 18) < 1) + self.failUnless(abs(output["cancel"]["999_percentile"] - 18) < 1) + + self.failUnlessEqual(len(ss.latencies["get"]), 1) + self.failUnless(abs(output["get"]["mean"] - 5) < 1) + self.failUnless(abs(output["get"]["median"] - 5) < 1) + self.failUnless(abs(output["get"]["90_percentile"] - 5) < 1) + self.failUnless(abs(output["get"]["95_percentile"] - 5) < 1) + self.failUnless(abs(output["get"]["99_percentile"] - 5) < 1) + self.failUnless(abs(output["get"]["999_percentile"] - 5) < 1) -- 2.45.2