import os, re, weakref, struct, time
-from foolscap import Referenceable
+from foolscap.api import Referenceable
from twisted.application import service
from zope.interface import implements
from allmydata.interfaces import RIStorageServer, IStatsProducer
-from allmydata.util import base32, fileutil, log, time_format
+from allmydata.util import fileutil, idlib, log, time_format
import allmydata # for __full_version__
from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
create_mutable_sharefile
from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
from allmydata.storage.crawler import BucketCountingCrawler
+from allmydata.storage.expirer import LeaseCheckingCrawler
# storage/
# storage/shares/incoming
class StorageServer(service.MultiService, Referenceable):
implements(RIStorageServer, IStatsProducer)
name = 'storage'
+ LeaseCheckerClass = LeaseCheckingCrawler
def __init__(self, storedir, nodeid, reserved_space=0,
discard_storage=False, readonly_storage=False,
- stats_provider=None):
+ stats_provider=None,
+ expiration_enabled=False,
+ expiration_mode="age",
+ expiration_override_lease_duration=None,
+ expiration_cutoff_date=None,
+ expiration_sharetypes=("mutable", "immutable")):
service.MultiService.__init__(self)
assert isinstance(nodeid, str)
assert len(nodeid) == 20
self._clean_incomplete()
fileutil.make_dirs(self.incomingdir)
self._active_writers = weakref.WeakKeyDictionary()
- lp = log.msg("StorageServer created", facility="tahoe.storage")
+ log.msg("StorageServer created", facility="tahoe.storage")
if reserved_space:
if self.get_available_space() is None:
- log.msg("warning: [storage]reserved_space= is set, but this platform does not support statvfs(2), so this reservation cannot be honored",
+ log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
umin="0wZ27w", level=log.UNUSUAL)
self.latencies = {"allocate": [], # immutable
}
self.add_bucket_counter()
+ statefile = os.path.join(self.storedir, "lease_checker.state")
+ historyfile = os.path.join(self.storedir, "lease_checker.history")
+ klass = self.LeaseCheckerClass
+ self.lease_checker = klass(self, statefile, historyfile,
+ expiration_enabled, expiration_mode,
+ expiration_override_lease_duration,
+ expiration_cutoff_date,
+ expiration_sharetypes)
+ self.lease_checker.setServiceParent(self)
+
+ def __repr__(self):
+ return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
+
def add_bucket_counter(self):
statefile = os.path.join(self.storedir, "bucket_counter.state")
self.bucket_counter = BucketCountingCrawler(self, statefile)
def get_latencies(self):
"""Return a dict, indexed by category, that contains a dict of
- latency numbers for each category. Each dict will contain the
+ latency numbers for each category. If there are sufficient samples
+ for unambiguous interpretation, each dict will contain the
following keys: mean, 01_0_percentile, 10_0_percentile,
50_0_percentile (median), 90_0_percentile, 95_0_percentile,
- 99_0_percentile, 99_9_percentile. If no samples have been collected
- for the given category, then that category name will not be present
- in the return value."""
+ 99_0_percentile, 99_9_percentile. If there are insufficient
+ samples for a given percentile to be interpreted unambiguously
+ that percentile will be reported as None. If no samples have been
+ collected for the given category, then that category name will
+ not be present in the return value. """
# note that Amazon's Dynamo paper says they use 99.9% percentile.
output = {}
for category in self.latencies:
continue
stats = {}
samples = self.latencies[category][:]
- samples.sort()
count = len(samples)
- stats["mean"] = sum(samples) / count
- stats["01_0_percentile"] = samples[int(0.01 * count)]
- stats["10_0_percentile"] = samples[int(0.1 * count)]
- stats["50_0_percentile"] = samples[int(0.5 * count)]
- stats["90_0_percentile"] = samples[int(0.9 * count)]
- stats["95_0_percentile"] = samples[int(0.95 * count)]
- stats["99_0_percentile"] = samples[int(0.99 * count)]
- stats["99_9_percentile"] = samples[int(0.999 * count)]
+ stats["samplesize"] = count
+ samples.sort()
+ if count > 1:
+ stats["mean"] = sum(samples) / count
+ else:
+ stats["mean"] = None
+
+ orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
+ (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
+ (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
+ (0.999, "99_9_percentile", 1000)]
+
+ for percentile, percentilestring, minnumtoobserve in orderstatlist:
+ if count >= minnumtoobserve:
+ stats[percentilestring] = samples[int(percentile*count)]
+ else:
+ stats[percentilestring] = None
+
output[category] = stats
return output
def _clean_incomplete(self):
fileutil.rm_dir(self.incomingdir)
- def do_statvfs(self):
- return os.statvfs(self.storedir)
-
def get_stats(self):
# remember: RIStatsProvider requires that our return dict
# contains numeric values.
stats = { 'storage_server.allocated': self.allocated_size(), }
- stats["storage_server.reserved_space"] = self.reserved_space
+ stats['storage_server.reserved_space'] = self.reserved_space
for category,ld in self.get_latencies().items():
for name,v in ld.items():
stats['storage_server.latencies.%s.%s' % (category, name)] = v
- writeable = True
- if self.readonly_storage:
- writeable = False
+
try:
- s = self.do_statvfs()
- disk_total = s.f_bsize * s.f_blocks
- disk_used = s.f_bsize * (s.f_blocks - s.f_bfree)
- # spacetime predictors should look at the slope of disk_used.
- disk_free_for_root = s.f_bsize * s.f_bfree
- disk_free_for_nonroot = s.f_bsize * s.f_bavail
-
- # include our local policy here: if we stop accepting shares when
- # the available space drops below 1GB, then include that fact in
- # disk_avail.
- disk_avail = disk_free_for_nonroot - self.reserved_space
- disk_avail = max(disk_avail, 0)
- if self.readonly_storage:
- disk_avail = 0
- if disk_avail == 0:
- writeable = False
+ disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
+ writeable = disk['avail'] > 0
# spacetime predictors should use disk_avail / (d(disk_used)/dt)
- stats["storage_server.disk_total"] = disk_total
- stats["storage_server.disk_used"] = disk_used
- stats["storage_server.disk_free_for_root"] = disk_free_for_root
- stats["storage_server.disk_free_for_nonroot"] = disk_free_for_nonroot
- stats["storage_server.disk_avail"] = disk_avail
+ stats['storage_server.disk_total'] = disk['total']
+ stats['storage_server.disk_used'] = disk['used']
+ stats['storage_server.disk_free_for_root'] = disk['free_for_root']
+ stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
+ stats['storage_server.disk_avail'] = disk['avail']
except AttributeError:
- # os.statvfs is available only on unix
- pass
- stats["storage_server.accepting_immutable_shares"] = int(writeable)
+ writeable = True
+ except EnvironmentError:
+ log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
+ writeable = False
+
+ if self.readonly_storage:
+ stats['storage_server.disk_avail'] = 0
+ writeable = False
+
+ stats['storage_server.accepting_immutable_shares'] = int(writeable)
s = self.bucket_counter.get_state()
bucket_count = s.get("last-complete-bucket-count")
if bucket_count:
- stats["storage_server.total_bucket_count"] = bucket_count
+ stats['storage_server.total_bucket_count'] = bucket_count
return stats
-
- def stat_disk(self, d):
- s = os.statvfs(d)
- # s.f_bavail: available to non-root users
- disk_avail = s.f_bsize * s.f_bavail
- return disk_avail
-
def get_available_space(self):
- # returns None if it cannot be measured (windows)
- try:
- disk_avail = self.stat_disk(self.storedir)
- disk_avail -= self.reserved_space
- except AttributeError:
- disk_avail = None
+ """Returns available space for share storage in bytes, or None if no
+ API to get this information is available."""
+
if self.readonly_storage:
- disk_avail = 0
- return disk_avail
+ return 0
+ return fileutil.get_available_space(self.sharedir, self.reserved_space)
def allocated_size(self):
space = 0
def remote_get_version(self):
remaining_space = self.get_available_space()
if remaining_space is None:
- # we're on a platform that doesn't have 'df', so make a vague
- # guess.
+ # We're on a platform that has no API to get disk stats.
remaining_space = 2**64
+
version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
{ "maximum-immutable-share-size": remaining_space,
"tolerates-immutable-read-overrun": True,
"delete-mutable-shares-with-zero-length-writev": True,
+ "prevents-read-past-end-of-share-data": True,
},
"application-version": str(allmydata.__full_version__),
}
# has already been written to disk, where it will show up in
# get_available_space.
remaining_space -= self.allocated_size()
+ # self.readonly_storage causes remaining_space <= 0
# fill alreadygot with all shares that we have, not just the ones
# they asked about: this will save them a lot of work. Add or update
sf = ShareFile(fn)
sf.add_or_renew_lease(lease_info)
- # self.readonly_storage causes remaining_space=0
-
for shnum in sharenums:
incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
if not found_buckets:
raise IndexError("no such lease to renew")
- def remote_cancel_lease(self, storage_index, cancel_secret):
- start = time.time()
- self.count("cancel")
-
- total_space_freed = 0
- found_buckets = False
- for sf in self._iter_share_files(storage_index):
- # note: if we can't find a lease on one share, we won't bother
- # looking in the others. Unless something broke internally
- # (perhaps we ran out of disk space while adding a lease), the
- # leases on all shares will be identical.
- found_buckets = True
- # this raises IndexError if the lease wasn't present XXXX
- total_space_freed += sf.cancel_lease(cancel_secret)
-
- if found_buckets:
- storagedir = os.path.join(self.sharedir,
- storage_index_to_dir(storage_index))
- if not os.listdir(storagedir):
- os.rmdir(storagedir)
-
- if self.stats_provider:
- self.stats_provider.count('storage_server.bytes_freed',
- total_space_freed)
- self.add_latency("cancel", time.time() - start)
- if not found_buckets:
- raise IndexError("no such storage index")
-
def bucket_writer_closed(self, bw, consumed_size):
if self.stats_provider:
self.stats_provider.count('storage_server.bytes_added', consumed_size)
def get_leases(self, storage_index):
"""Provide an iterator that yields all of the leases attached to this
- bucket. Each lease is returned as a tuple of (owner_num,
- renew_secret, cancel_secret, expiration_time).
+ bucket. Each lease is returned as a LeaseInfo instance.
This method is not for client use.
"""
try:
shnum, filename = self._get_bucket_shares(storage_index).next()
sf = ShareFile(filename)
- return sf.iter_leases()
+ return sf.get_leases()
except StopIteration:
return iter([])
start = time.time()
self.count("writev")
si_s = si_b2a(storage_index)
- lp = log.msg("storage: slot_writev %s" % si_s)
+ log.msg("storage: slot_writev %s" % si_s)
si_dir = storage_index_to_dir(storage_index)
(write_enabler, renew_secret, cancel_secret) = secrets
# shares exist if there is a file for them
share_type=share_type, si=si_s, shnum=shnum, reason=reason,
level=log.SCARY, umid="SGx2fA")
return None
-