From 6b7ff02e36c8fc9c308fc2e385ed90d94097742b Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@allmydata.com>
Date: Mon, 16 Jun 2008 15:21:55 -0700
Subject: [PATCH] storage: measure latency-per-operation, calculate
 mean/median/percentiles

---
 src/allmydata/storage.py           | 69 +++++++++++++++++++++++++++--
 src/allmydata/test/test_storage.py | 71 +++++++++++++++++++++++++++++-
 2 files changed, 135 insertions(+), 5 deletions(-)

diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py
index df2589da..5a14a5f8 100644
--- a/src/allmydata/storage.py
+++ b/src/allmydata/storage.py
@@ -203,13 +203,16 @@ class BucketWriter(Referenceable):
         return self._size
 
     def remote_write(self, offset, data):
+        start = time.time()
         precondition(not self.closed)
         if self.throw_out_all_data:
             return
         self._sharefile.write_share_data(offset, data)
+        self.ss.add_latency("write", time.time() - start)
 
     def remote_close(self):
         precondition(not self.closed)
+        start = time.time()
 
         fileutil.make_dirs(os.path.dirname(self.finalhome))
         fileutil.rename(self.incominghome, self.finalhome)
@@ -225,6 +228,7 @@ class BucketWriter(Referenceable):
 
         filelen = os.stat(self.finalhome)[stat.ST_SIZE]
         self.ss.bucket_writer_closed(self, filelen)
+        self.ss.add_latency("close", time.time() - start)
 
     def _disconnected(self):
         if not self.closed:
@@ -252,11 +256,15 @@ class BucketWriter(Referenceable):
 class BucketReader(Referenceable):
     implements(RIBucketReader)
 
-    def __init__(self, sharefname):
+    def __init__(self, ss, sharefname):
+        self.ss = ss
         self._share_file = ShareFile(sharefname)
 
     def remote_read(self, offset, length):
-        return self._share_file.read_share_data(offset, length)
+        start = time.time()
+        data = self._share_file.read_share_data(offset, length)
+        self.ss.add_latency("read", time.time() - start)
+        return data
 
 
 # the MutableShareFile is like the ShareFile, but used for mutable data. It
@@ -704,6 +712,47 @@ class StorageServer(service.MultiService, Referenceable):
             log.msg(format="space measurement done, consumed=%(consumed)d bytes",
                     consumed=self.consumed,
                     parent=lp, facility="tahoe.storage")
+        self.latencies = {"allocate": [], # immutable
+                          "write": [],
+                          "close": [],
+                          "read": [],
+                          "renew": [],
+                          "cancel": [],
+                          "get": [],
+                          "writev": [], # mutable
+                          "readv": [],
+                          }
+
+    def add_latency(self, category, latency):
+        a = self.latencies[category]
+        a.append(latency)
+        if len(a) > 1000:
+            self.latencies[category] = a[-1000:]
+
+    def get_latencies(self):
+        """Return a dict, indexed by category, that contains a dict of
+        latency numbers for each category. Each dict will contain the
+        following keys: mean, median, 90_percentile, 95_percentile,
+        99_percentile). If no samples have been collected for the given
+        category, then that category name will not be present in the return
+        value."""
+        # note that Amazon's Dynamo paper says they use 99.9% percentile.
+        output = {}
+        for category in self.latencies:
+            if not self.latencies[category]:
+                continue
+            stats = {}
+            samples = self.latencies[category][:]
+            samples.sort()
+            count = len(samples)
+            stats["mean"] = sum(samples) / count
+            stats["median"] = samples[int(0.5 * count)]
+            stats["90_percentile"] = samples[int(0.9 * count)]
+            stats["95_percentile"] = samples[int(0.95 * count)]
+            stats["99_percentile"] = samples[int(0.99 * count)]
+            stats["999_percentile"] = samples[int(0.999 * count)]
+            output[category] = stats
+        return output
 
     def log(self, *args, **kwargs):
         if "facility" not in kwargs:
@@ -747,6 +796,7 @@ class StorageServer(service.MultiService, Referenceable):
         # owner_num is not for clients to set, but rather it should be
         # curried into the PersonalStorageServer instance that is dedicated
         # to a particular owner.
+        start = time.time()
         alreadygot = set()
         bucketwriters = {} # k: shnum, v: BucketWriter
         si_dir = storage_index_to_dir(storage_index)
@@ -778,6 +828,7 @@ class StorageServer(service.MultiService, Referenceable):
 
         if self.readonly_storage:
             # we won't accept new shares
+            self.add_latency("allocate", time.time() - start)
             return alreadygot, bucketwriters
 
         for shnum in sharenums:
@@ -809,9 +860,11 @@ class StorageServer(service.MultiService, Referenceable):
         if bucketwriters:
             fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
 
+        self.add_latency("allocate", time.time() - start)
         return alreadygot, bucketwriters
 
     def remote_renew_lease(self, storage_index, renew_secret):
+        start = time.time()
         new_expire_time = time.time() + 31*24*60*60
         found_buckets = False
         for shnum, filename in self._get_bucket_shares(storage_index):
@@ -829,10 +882,12 @@ class StorageServer(service.MultiService, Referenceable):
             else:
                 pass # non-sharefile
             sf.renew_lease(renew_secret, new_expire_time)
+        self.add_latency("renew", time.time() - start)
         if not found_buckets:
             raise IndexError("no such lease to renew")
 
     def remote_cancel_lease(self, storage_index, cancel_secret):
+        start = time.time()
         storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
 
         remaining_files = 0
@@ -873,6 +928,7 @@ class StorageServer(service.MultiService, Referenceable):
             self.consumed -= total_space_freed
         if self.stats_provider:
             self.stats_provider.count('storage_server.bytes_freed', total_space_freed)
+        self.add_latency("cancel", time.time() - start)
         if not found_buckets:
             raise IndexError("no such lease to cancel")
 
@@ -908,11 +964,13 @@ class StorageServer(service.MultiService, Referenceable):
             pass
 
     def remote_get_buckets(self, storage_index):
+        start = time.time()
         si_s = si_b2a(storage_index)
         log.msg("storage: get_buckets %s" % si_s)
         bucketreaders = {} # k: sharenum, v: BucketReader
         for shnum, filename in self._get_bucket_shares(storage_index):
-            bucketreaders[shnum] = BucketReader(filename)
+            bucketreaders[shnum] = BucketReader(self, filename)
+        self.add_latency("get", time.time() - start)
         return bucketreaders
 
     def get_leases(self, storage_index):
@@ -936,6 +994,7 @@ class StorageServer(service.MultiService, Referenceable):
                                                secrets,
                                                test_and_write_vectors,
                                                read_vector):
+        start = time.time()
         si_s = si_b2a(storage_index)
         lp = log.msg("storage: slot_writev %s" % si_s)
         si_dir = storage_index_to_dir(storage_index)
@@ -1002,6 +1061,7 @@ class StorageServer(service.MultiService, Referenceable):
                 share.add_or_renew_lease(lease_info)
 
         # all done
+        self.add_latency("writev", time.time() - start)
         return (testv_is_good, read_data)
 
     def _allocate_slot_share(self, bucketdir, secrets, sharenum,
@@ -1015,6 +1075,7 @@ class StorageServer(service.MultiService, Referenceable):
         return share
 
     def remote_slot_readv(self, storage_index, shares, readv):
+        start = time.time()
         si_s = si_b2a(storage_index)
         lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
                      facility="tahoe.storage", level=log.OPERATIONAL)
@@ -1022,6 +1083,7 @@ class StorageServer(service.MultiService, Referenceable):
         # shares exist if there is a file for them
         bucketdir = os.path.join(self.sharedir, si_dir)
         if not os.path.isdir(bucketdir):
+            self.add_latency("readv", time.time() - start)
             return {}
         datavs = {}
         for sharenum_s in os.listdir(bucketdir):
@@ -1035,6 +1097,7 @@ class StorageServer(service.MultiService, Referenceable):
                 datavs[sharenum] = msf.readv(readv)
         log.msg("returning shares %s" % (datavs.keys(),),
                 facility="tahoe.storage", level=log.NOISY, parent=lp)
+        self.add_latency("readv", time.time() - start)
         return datavs
 
 
diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py
index 26f64a86..64cf7e2e 100644
--- a/src/allmydata/test/test_storage.py
+++ b/src/allmydata/test/test_storage.py
@@ -29,6 +29,8 @@ class Bucket(unittest.TestCase):
 
     def bucket_writer_closed(self, bw, consumed):
         pass
+    def add_latency(self, category, latency):
+        pass
 
     def make_lease(self):
         owner_num = 0
@@ -57,7 +59,7 @@ class Bucket(unittest.TestCase):
         bw.remote_close()
 
         # now read from it
-        br = BucketReader(bw.finalhome)
+        br = BucketReader(self, bw.finalhome)
         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
@@ -92,6 +94,8 @@ class BucketProxy(unittest.TestCase):
 
     def bucket_writer_closed(self, bw, consumed):
         pass
+    def add_latency(self, category, latency):
+        pass
 
     def test_create(self):
         bw, rb, sharefname = self.make_bucket("test_create", 500)
@@ -147,7 +151,7 @@ class BucketProxy(unittest.TestCase):
 
         # now read everything back
         def _start_reading(res):
-            br = BucketReader(sharefname)
+            br = BucketReader(self, sharefname)
             rb = RemoteBucket()
             rb.target = br
             rbp = ReadBucketProxy(rb)
@@ -910,3 +914,66 @@ class MutableServer(unittest.TestCase):
         no_shares = read("si1", [], [(0,10)])
         self.failUnlessEqual(no_shares, {})
 
+class Stats(unittest.TestCase):
+
+    def setUp(self):
+        self.sparent = LoggingServiceParent()
+        self._lease_secret = itertools.count()
+    def tearDown(self):
+        return self.sparent.stopService()
+
+    def workdir(self, name):
+        basedir = os.path.join("storage", "Server", name)
+        return basedir
+
+    def create(self, name, sizelimit=None):
+        workdir = self.workdir(name)
+        ss = StorageServer(workdir, sizelimit)
+        ss.setServiceParent(self.sparent)
+        return ss
+
+    def test_latencies(self):
+        ss = self.create("test_latencies")
+        for i in range(10000):
+            ss.add_latency("allocate", 1.0 * i)
+        for i in range(1000):
+            ss.add_latency("renew", 1.0 * i)
+        for i in range(10):
+            ss.add_latency("cancel", 2.0 * i)
+        ss.add_latency("get", 5.0)
+
+        output = ss.get_latencies()
+
+        self.failUnlessEqual(sorted(output.keys()),
+                             sorted(["allocate", "renew", "cancel", "get"]))
+        self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
+        self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
+        self.failUnless(abs(output["allocate"]["median"] - 9500) < 1)
+        self.failUnless(abs(output["allocate"]["90_percentile"] - 9900) < 1)
+        self.failUnless(abs(output["allocate"]["95_percentile"] - 9950) < 1)
+        self.failUnless(abs(output["allocate"]["99_percentile"] - 9990) < 1)
+        self.failUnless(abs(output["allocate"]["999_percentile"] - 9999) < 1)
+
+        self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
+        self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
+        self.failUnless(abs(output["renew"]["median"] - 500) < 1)
+        self.failUnless(abs(output["renew"]["90_percentile"] - 900) < 1)
+        self.failUnless(abs(output["renew"]["95_percentile"] - 950) < 1)
+        self.failUnless(abs(output["renew"]["99_percentile"] - 990) < 1)
+        self.failUnless(abs(output["renew"]["999_percentile"] - 999) < 1)
+
+        self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
+        self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
+        self.failUnless(abs(output["cancel"]["median"] - 10) < 1)
+        self.failUnless(abs(output["cancel"]["90_percentile"] - 18) < 1)
+        self.failUnless(abs(output["cancel"]["95_percentile"] - 18) < 1)
+        self.failUnless(abs(output["cancel"]["99_percentile"] - 18) < 1)
+        self.failUnless(abs(output["cancel"]["999_percentile"] - 18) < 1)
+
+        self.failUnlessEqual(len(ss.latencies["get"]), 1)
+        self.failUnless(abs(output["get"]["mean"] - 5) < 1)
+        self.failUnless(abs(output["get"]["median"] - 5) < 1)
+        self.failUnless(abs(output["get"]["90_percentile"] - 5) < 1)
+        self.failUnless(abs(output["get"]["95_percentile"] - 5) < 1)
+        self.failUnless(abs(output["get"]["99_percentile"] - 5) < 1)
+        self.failUnless(abs(output["get"]["999_percentile"] - 5) < 1)
-- 
2.45.2