return self._size
def remote_write(self, offset, data):
+ start = time.time()
precondition(not self.closed)
if self.throw_out_all_data:
return
self._sharefile.write_share_data(offset, data)
+ self.ss.add_latency("write", time.time() - start)
def remote_close(self):
precondition(not self.closed)
+ start = time.time()
fileutil.make_dirs(os.path.dirname(self.finalhome))
fileutil.rename(self.incominghome, self.finalhome)
filelen = os.stat(self.finalhome)[stat.ST_SIZE]
self.ss.bucket_writer_closed(self, filelen)
+ self.ss.add_latency("close", time.time() - start)
def _disconnected(self):
if not self.closed:
class BucketReader(Referenceable):
implements(RIBucketReader)
- def __init__(self, sharefname):
+ def __init__(self, ss, sharefname):
+ self.ss = ss
self._share_file = ShareFile(sharefname)
def remote_read(self, offset, length):
- return self._share_file.read_share_data(offset, length)
+ start = time.time()
+ data = self._share_file.read_share_data(offset, length)
+ self.ss.add_latency("read", time.time() - start)
+ return data
# the MutableShareFile is like the ShareFile, but used for mutable data. It
log.msg(format="space measurement done, consumed=%(consumed)d bytes",
consumed=self.consumed,
parent=lp, facility="tahoe.storage")
+ self.latencies = {"allocate": [], # immutable
+ "write": [],
+ "close": [],
+ "read": [],
+ "renew": [],
+ "cancel": [],
+ "get": [],
+ "writev": [], # mutable
+ "readv": [],
+ }
+
+ def add_latency(self, category, latency):
+ a = self.latencies[category]
+ a.append(latency)
+ if len(a) > 1000:
+ self.latencies[category] = a[-1000:]
+
+ def get_latencies(self):
+ """Return a dict, indexed by category, that contains a dict of
+ latency numbers for each category. Each dict will contain the
+ following keys: mean, median, 90_percentile, 95_percentile,
+ 99_percentile). If no samples have been collected for the given
+ category, then that category name will not be present in the return
+ value."""
+ # note that Amazon's Dynamo paper says they use 99.9% percentile.
+ output = {}
+ for category in self.latencies:
+ if not self.latencies[category]:
+ continue
+ stats = {}
+ samples = self.latencies[category][:]
+ samples.sort()
+ count = len(samples)
+ stats["mean"] = sum(samples) / count
+ stats["median"] = samples[int(0.5 * count)]
+ stats["90_percentile"] = samples[int(0.9 * count)]
+ stats["95_percentile"] = samples[int(0.95 * count)]
+ stats["99_percentile"] = samples[int(0.99 * count)]
+ stats["999_percentile"] = samples[int(0.999 * count)]
+ output[category] = stats
+ return output
def log(self, *args, **kwargs):
if "facility" not in kwargs:
# owner_num is not for clients to set, but rather it should be
# curried into the PersonalStorageServer instance that is dedicated
# to a particular owner.
+ start = time.time()
alreadygot = set()
bucketwriters = {} # k: shnum, v: BucketWriter
si_dir = storage_index_to_dir(storage_index)
if self.readonly_storage:
# we won't accept new shares
+ self.add_latency("allocate", time.time() - start)
return alreadygot, bucketwriters
for shnum in sharenums:
if bucketwriters:
fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
+ self.add_latency("allocate", time.time() - start)
return alreadygot, bucketwriters
def remote_renew_lease(self, storage_index, renew_secret):
+ start = time.time()
new_expire_time = time.time() + 31*24*60*60
found_buckets = False
for shnum, filename in self._get_bucket_shares(storage_index):
else:
pass # non-sharefile
sf.renew_lease(renew_secret, new_expire_time)
+ self.add_latency("renew", time.time() - start)
if not found_buckets:
raise IndexError("no such lease to renew")
def remote_cancel_lease(self, storage_index, cancel_secret):
+ start = time.time()
storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
remaining_files = 0
self.consumed -= total_space_freed
if self.stats_provider:
self.stats_provider.count('storage_server.bytes_freed', total_space_freed)
+ self.add_latency("cancel", time.time() - start)
if not found_buckets:
raise IndexError("no such lease to cancel")
pass
def remote_get_buckets(self, storage_index):
+ start = time.time()
si_s = si_b2a(storage_index)
log.msg("storage: get_buckets %s" % si_s)
bucketreaders = {} # k: sharenum, v: BucketReader
for shnum, filename in self._get_bucket_shares(storage_index):
- bucketreaders[shnum] = BucketReader(filename)
+ bucketreaders[shnum] = BucketReader(self, filename)
+ self.add_latency("get", time.time() - start)
return bucketreaders
def get_leases(self, storage_index):
secrets,
test_and_write_vectors,
read_vector):
+ start = time.time()
si_s = si_b2a(storage_index)
lp = log.msg("storage: slot_writev %s" % si_s)
si_dir = storage_index_to_dir(storage_index)
share.add_or_renew_lease(lease_info)
# all done
+ self.add_latency("writev", time.time() - start)
return (testv_is_good, read_data)
def _allocate_slot_share(self, bucketdir, secrets, sharenum,
return share
def remote_slot_readv(self, storage_index, shares, readv):
+ start = time.time()
si_s = si_b2a(storage_index)
lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
facility="tahoe.storage", level=log.OPERATIONAL)
# shares exist if there is a file for them
bucketdir = os.path.join(self.sharedir, si_dir)
if not os.path.isdir(bucketdir):
+ self.add_latency("readv", time.time() - start)
return {}
datavs = {}
for sharenum_s in os.listdir(bucketdir):
datavs[sharenum] = msf.readv(readv)
log.msg("returning shares %s" % (datavs.keys(),),
facility="tahoe.storage", level=log.NOISY, parent=lp)
+ self.add_latency("readv", time.time() - start)
return datavs
def bucket_writer_closed(self, bw, consumed):
pass
+ def add_latency(self, category, latency):
+ pass
def make_lease(self):
owner_num = 0
bw.remote_close()
# now read from it
- br = BucketReader(bw.finalhome)
+ br = BucketReader(self, bw.finalhome)
self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
def bucket_writer_closed(self, bw, consumed):
pass
+ def add_latency(self, category, latency):
+ pass
def test_create(self):
bw, rb, sharefname = self.make_bucket("test_create", 500)
# now read everything back
def _start_reading(res):
- br = BucketReader(sharefname)
+ br = BucketReader(self, sharefname)
rb = RemoteBucket()
rb.target = br
rbp = ReadBucketProxy(rb)
no_shares = read("si1", [], [(0,10)])
self.failUnlessEqual(no_shares, {})
+class Stats(unittest.TestCase):
+
+ def setUp(self):
+ self.sparent = LoggingServiceParent()
+ self._lease_secret = itertools.count()
+ def tearDown(self):
+ return self.sparent.stopService()
+
+ def workdir(self, name):
+ basedir = os.path.join("storage", "Server", name)
+ return basedir
+
+ def create(self, name, sizelimit=None):
+ workdir = self.workdir(name)
+ ss = StorageServer(workdir, sizelimit)
+ ss.setServiceParent(self.sparent)
+ return ss
+
+ def test_latencies(self):
+ ss = self.create("test_latencies")
+ for i in range(10000):
+ ss.add_latency("allocate", 1.0 * i)
+ for i in range(1000):
+ ss.add_latency("renew", 1.0 * i)
+ for i in range(10):
+ ss.add_latency("cancel", 2.0 * i)
+ ss.add_latency("get", 5.0)
+
+ output = ss.get_latencies()
+
+ self.failUnlessEqual(sorted(output.keys()),
+ sorted(["allocate", "renew", "cancel", "get"]))
+ self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
+ self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
+ self.failUnless(abs(output["allocate"]["median"] - 9500) < 1)
+ self.failUnless(abs(output["allocate"]["90_percentile"] - 9900) < 1)
+ self.failUnless(abs(output["allocate"]["95_percentile"] - 9950) < 1)
+ self.failUnless(abs(output["allocate"]["99_percentile"] - 9990) < 1)
+ self.failUnless(abs(output["allocate"]["999_percentile"] - 9999) < 1)
+
+ self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
+ self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
+ self.failUnless(abs(output["renew"]["median"] - 500) < 1)
+ self.failUnless(abs(output["renew"]["90_percentile"] - 900) < 1)
+ self.failUnless(abs(output["renew"]["95_percentile"] - 950) < 1)
+ self.failUnless(abs(output["renew"]["99_percentile"] - 990) < 1)
+ self.failUnless(abs(output["renew"]["999_percentile"] - 999) < 1)
+
+ self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
+ self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
+ self.failUnless(abs(output["cancel"]["median"] - 10) < 1)
+ self.failUnless(abs(output["cancel"]["90_percentile"] - 18) < 1)
+ self.failUnless(abs(output["cancel"]["95_percentile"] - 18) < 1)
+ self.failUnless(abs(output["cancel"]["99_percentile"] - 18) < 1)
+ self.failUnless(abs(output["cancel"]["999_percentile"] - 18) < 1)
+
+ self.failUnlessEqual(len(ss.latencies["get"]), 1)
+ self.failUnless(abs(output["get"]["mean"] - 5) < 1)
+ self.failUnless(abs(output["get"]["median"] - 5) < 1)
+ self.failUnless(abs(output["get"]["90_percentile"] - 5) < 1)
+ self.failUnless(abs(output["get"]["95_percentile"] - 5) < 1)
+ self.failUnless(abs(output["get"]["99_percentile"] - 5) < 1)
+ self.failUnless(abs(output["get"]["999_percentile"] - 5) < 1)