"shares.total" in the "[client]" section. The default parameters are still
3-of-10.
+The inefficient storage 'sizelimit' control (which established an upper bound
+on the amount of space that a storage server is allowed to consume) has been
+replaced by a lightweight 'reserved_space' control (which establishes a lower
+bound on the amount of remaining space). The storage server will reject all
+writes that would cause the remaining disk space (as measured by a '/bin/df'
+equivalent) to drop below this value. The "[storage]reserved_space="
+tahoe.cfg parameter controls this setting. (note that this only affects
+immutable shares: it is an outstanding bug that reserved_space does not
+prevent the allocation of new mutable shares, nor does it prevent the growth
+of existing mutable shares).
+
** CLI Changes
This release adds the 'tahoe create-alias' command, which is a combination of
The Mac GUI in src/allmydata/gui/ has been improved.
+
* Release 1.2.0 (2008-07-21)
** Security
written and modified anyway. See ticket #390 for the current status of this
bug. The default value is False.
-sizelimit = (str, optional)
-
- If provided, this value establishes an upper bound (in bytes) on the amount
- of storage consumed by share data (data that your node holds on behalf of
- clients that are uploading files to the grid). To avoid providing more than
- 100MB of data to other clients, set this key to "100MB". Note that this is a
- fairly loose bound, and the node may occasionally use slightly more storage
- than this. To enforce a stronger (and possibly more reliable) limit, use a
- symlink to place the 'storage/' directory on a separate size-limited
- filesystem, and/or use per-user OS/filesystem quotas. If a size limit is
- specified then Tahoe will do a "du" at startup (traversing all the storage
- and summing the sizes of the files), which can take a long time if there are
- a lot of shares stored.
+reserved_space = (str, optional)
+
+ If provided, this value defines how much disk space is reserved: the storage
+ server will not accept any share which causes the amount of free space (as
+ measured by 'df', or more specifically statvfs(2)) to drop below this value.
This string contains a number, with an optional case-insensitive scale
- suffix like "K" or "M" or "G", and an optional "B" suffix. So "100MB",
- "100M", "100000000B", "100000000", and "100000kb" all mean the same thing.
+ suffix like "K" or "M" or "G", and an optional "B" or "iB" suffix. So
+ "100MB", "100M", "100000000B", "100000000", and "100000kb" all mean the same
+ thing. Likewise, "1MiB", "1024KiB", and "1048576B" all mean the same thing.
== Running A Helper ==
-import os, stat, time, re, weakref
+import os, stat, time, weakref
from allmydata.interfaces import RIStorageServer
from allmydata import node
from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient
from allmydata.util import hashutil, base32, pollmixin, cachedir
+from allmydata.util.abbreviate import parse_abbreviated_size
from allmydata.uri import LiteralFileURI
from allmydata.dirnode import NewDirectoryNode
from allmydata.mutable.node import MutableFileNode, MutableWatcher
self.set_config("storage", "enabled", "false")
if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
self.set_config("storage", "readonly", "true")
- copy("sizelimit", "storage", "sizelimit")
if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
self.set_config("storage", "debug_discard", "true")
if os.path.exists(os.path.join(self.basedir, "run_helper")):
storedir = os.path.join(self.basedir, self.STOREDIR)
- sizelimit = None
- data = self.get_config("storage", "sizelimit", None)
- if data:
- m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
- if not m:
- log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
- else:
- number, suffix = m.groups()
- suffix = suffix.upper()
- if suffix.endswith("B"):
- suffix = suffix[:-1]
- multiplier = {"": 1,
- "K": 1000,
- "M": 1000 * 1000,
- "G": 1000 * 1000 * 1000,
- }[suffix]
- sizelimit = int(number) * multiplier
+ data = self.get_config("storage", "reserved_space", None)
+ reserved = None
+ try:
+ reserved = parse_abbreviated_size(data)
+ except ValueError:
+ log.msg("[storage]reserved_space= contains unparseable value %s"
+ % data)
+ if reserved is None:
+ reserved = 0
discard = self.get_config("storage", "debug_discard", False,
boolean=True)
- ss = StorageServer(storedir, sizelimit, discard, readonly,
- self.stats_provider)
+ ss = StorageServer(storedir,
+ reserved_space=reserved,
+ discard_storage=discard,
+ readonly_storage=readonly,
+ stats_provider=self.stats_provider)
self.add_service(ss)
d = self.when_tub_ready()
# we can't do registerReference until the Tub is ready
storage_enabled = not config.get("no-storage", None)
c.write("enabled = %s\n" % boolstr[storage_enabled])
c.write("#readonly =\n")
- c.write("#sizelimit =\n")
+ c.write("#reserved_space =\n")
c.write("\n")
c.write("[helper]\n")
"application-version": str(allmydata.__version__),
}
- def __init__(self, storedir, sizelimit=None,
+ def __init__(self, storedir, reserved_space=0,
discard_storage=False, readonly_storage=False,
stats_provider=None):
service.MultiService.__init__(self)
# we don't actually create the corruption-advisory dir until necessary
self.corruption_advisory_dir = os.path.join(storedir,
"corruption-advisories")
- self.sizelimit = sizelimit
+ self.reserved_space = int(reserved_space)
self.no_storage = discard_storage
self.readonly_storage = readonly_storage
self.stats_provider = stats_provider
self._clean_incomplete()
fileutil.make_dirs(self.incomingdir)
self._active_writers = weakref.WeakKeyDictionary()
- lp = log.msg("StorageServer created, now measuring space..",
- facility="tahoe.storage")
- self.consumed = None
- if self.sizelimit:
- self.consumed = fileutil.du(self.sharedir)
- log.msg(format="space measurement done, consumed=%(consumed)d bytes",
- consumed=self.consumed,
- parent=lp, facility="tahoe.storage")
+ lp = log.msg("StorageServer created", facility="tahoe.storage")
+
+ if reserved_space:
+ if self.get_available_space() is None:
+ log.msg("warning: [storage]reserved_space= is set, but this platform does not support statvfs(2), so this reservation cannot be honored",
+ umin="0wZ27w", level=log.UNUSUAL)
+
self.latencies = {"allocate": [], # immutable
"write": [],
"close": [],
def get_stats(self):
stats = { 'storage_server.allocated': self.allocated_size(), }
- if self.consumed is not None:
- stats['storage_server.consumed'] = self.consumed
for category,ld in self.get_latencies().items():
for name,v in ld.items():
stats['storage_server.latencies.%s.%s' % (category, name)] = v
+ writeable = True
try:
s = os.statvfs(self.storedir)
disk_total = s.f_bsize * s.f_blocks
disk_used = s.f_bsize * (s.f_blocks - s.f_bfree)
# spacetime predictors should look at the slope of disk_used.
disk_avail = s.f_bsize * s.f_bavail # available to non-root users
- # TODO: include our local policy here: if we stop accepting
- # shares when the available space drops below 1GB, then include
- # that fact in disk_avail.
- #
+ # include our local policy here: if we stop accepting shares when
+ # the available space drops below 1GB, then include that fact in
+ # disk_avail.
+ disk_avail -= self.reserved_space
+ disk_avail = max(disk_avail, 0)
+ if self.readonly_storage:
+ disk_avail = 0
+ if disk_avail == 0:
+ writeable = False
+
# spacetime predictors should use disk_avail / (d(disk_used)/dt)
stats["storage_server.disk_total"] = disk_total
stats["storage_server.disk_used"] = disk_used
except AttributeError:
# os.statvfs is only available on unix
pass
+ stats["storage_server.accepting_immutable_shares"] = writeable
return stats
+
+ def stat_disk(self, d):
+ s = os.statvfs(d)
+ # s.f_bavail: available to non-root users
+ disk_avail = s.f_bsize * s.f_bavail
+ return disk_avail
+
+ def get_available_space(self):
+ # returns None if it cannot be measured (windows)
+ try:
+ disk_avail = self.stat_disk(self.storedir)
+ except AttributeError:
+ return None
+ disk_avail -= self.reserved_space
+ if self.readonly_storage:
+ disk_avail = 0
+ return disk_avail
+
def allocated_size(self):
- space = self.consumed or 0
+ space = 0
for bw in self._active_writers:
space += bw.allocated_size()
return space
expire_time, self.my_nodeid)
space_per_bucket = allocated_size
- no_limits = self.sizelimit is None
- yes_limits = not no_limits
- if yes_limits:
- remaining_space = self.sizelimit - self.allocated_size()
+
+ remaining_space = self.get_available_space()
+ limited = remaining_space is not None
+ if limited:
+ # this is a bit conservative, since some of this allocated_size()
+ # has already been written to disk, where it will show up in
+ # get_available_space.
+ remaining_space -= self.allocated_size()
# fill alreadygot with all shares that we have, not just the ones
# they asked about: this will save them a lot of work. Add or update
sf = ShareFile(fn)
sf.add_or_renew_lease(lease_info)
- if self.readonly_storage:
- # we won't accept new shares
- self.add_latency("allocate", time.time() - start)
- return alreadygot, bucketwriters
+ # self.readonly_storage causes remaining_space=0
for shnum in sharenums:
incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
# occurs while the first is still in progress, the second
# uploader will use different storage servers.
pass
- elif no_limits or remaining_space >= space_per_bucket:
+ elif (not limited) or (remaining_space >= space_per_bucket):
# ok! we need to create the new share file.
bw = BucketWriter(self, incominghome, finalhome,
space_per_bucket, lease_info, canary)
bw.throw_out_all_data = True
bucketwriters[shnum] = bw
self._active_writers[bw] = 1
- if yes_limits:
+ if limited:
remaining_space -= space_per_bucket
else:
# bummer! not enough space to accept this bucket
if not os.listdir(storagedir):
os.rmdir(storagedir)
- if self.consumed is not None:
- self.consumed -= total_space_freed
if self.stats_provider:
self.stats_provider.count('storage_server.bytes_freed',
total_space_freed)
raise IndexError("no such storage index")
def bucket_writer_closed(self, bw, consumed_size):
- if self.consumed is not None:
- self.consumed += consumed_size
if self.stats_provider:
self.stats_provider.count('storage_server.bytes_added', consumed_size)
del self._active_writers[bw]
if self.mode in ("receive",):
# for this mode, the client-under-test gets all the shares,
# so our internal nodes can refuse requests
- f = open(os.path.join(nodedir, "sizelimit"), "w")
- f.write("0\n")
+ f = open(os.path.join(nodedir, "readonly_storage"), "w")
+ f.write("\n")
f.close()
c = self.add_service(client.Client(basedir=nodedir))
self.nodes.append(c)
# client[0] runs a webserver and a helper, no key_generator
write("webport", "tcp:0:interface=127.0.0.1")
write("run_helper", "yes")
- write("sizelimit", "10GB")
write("keepalive_timeout", "600")
if i == 3:
# client[3] runs a webserver and uses a helper, uses
def _uploaded(res):
(stdout, stderr) = res
self.failUnless("waiting for file data on stdin.." in stderr)
- self.failUnless("200 OK" in stderr)
+ self.failUnless("200 OK" in stderr, stderr)
self.readcap = stdout
self.failUnless(self.readcap.startswith("URI:CHK:"))
d.addCallback(_uploaded)
cancel_secret = c.get_cancel_secret()
self.failUnless(base32.b2a(cancel_secret))
- def test_sizelimit_1(self):
- basedir = "client.Basic.test_sizelimit_1"
+ BASECONFIG = ("[client]\n"
+ "introducer.furl = \n"
+ )
+
+ def test_reserved_1(self):
+ basedir = "client.Basic.test_reserved_1"
os.mkdir(basedir)
- open(os.path.join(basedir, "introducer.furl"), "w").write("")
- open(os.path.join(basedir, "vdrive.furl"), "w").write("")
- open(os.path.join(basedir, "sizelimit"), "w").write("1000")
+ f = open(os.path.join(basedir, "tahoe.cfg"), "w")
+ f.write(self.BASECONFIG)
+ f.write("[storage]\n")
+ f.write("enabled = true\n")
+ f.write("reserved_space = 1000\n")
+ f.close()
c = client.Client(basedir)
- self.failUnlessEqual(c.getServiceNamed("storage").sizelimit, 1000)
+ self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 1000)
- def test_sizelimit_2(self):
- basedir = "client.Basic.test_sizelimit_2"
+ def test_reserved_2(self):
+ basedir = "client.Basic.test_reserved_2"
os.mkdir(basedir)
- open(os.path.join(basedir, "introducer.furl"), "w").write("")
- open(os.path.join(basedir, "vdrive.furl"), "w").write("")
- open(os.path.join(basedir, "sizelimit"), "w").write("10K")
+ f = open(os.path.join(basedir, "tahoe.cfg"), "w")
+ f.write(self.BASECONFIG)
+ f.write("[storage]\n")
+ f.write("enabled = true\n")
+ f.write("reserved_space = 10K\n")
+ f.close()
c = client.Client(basedir)
- self.failUnlessEqual(c.getServiceNamed("storage").sizelimit, 10*1000)
+ self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 10*1000)
- def test_sizelimit_3(self):
- basedir = "client.Basic.test_sizelimit_3"
+ def test_reserved_3(self):
+ basedir = "client.Basic.test_reserved_3"
os.mkdir(basedir)
- open(os.path.join(basedir, "introducer.furl"), "w").write("")
- open(os.path.join(basedir, "vdrive.furl"), "w").write("")
- open(os.path.join(basedir, "sizelimit"), "w").write("5mB")
+ f = open(os.path.join(basedir, "tahoe.cfg"), "w")
+ f.write(self.BASECONFIG)
+ f.write("[storage]\n")
+ f.write("enabled = true\n")
+ f.write("reserved_space = 5mB\n")
+ f.close()
c = client.Client(basedir)
- self.failUnlessEqual(c.getServiceNamed("storage").sizelimit,
+ self.failUnlessEqual(c.getServiceNamed("storage").reserved_space,
5*1000*1000)
- def test_sizelimit_4(self):
- basedir = "client.Basic.test_sizelimit_4"
+ def test_reserved_4(self):
+ basedir = "client.Basic.test_reserved_4"
os.mkdir(basedir)
- open(os.path.join(basedir, "introducer.furl"), "w").write("")
- open(os.path.join(basedir, "vdrive.furl"), "w").write("")
- open(os.path.join(basedir, "sizelimit"), "w").write("78Gb")
+ f = open(os.path.join(basedir, "tahoe.cfg"), "w")
+ f.write(self.BASECONFIG)
+ f.write("[storage]\n")
+ f.write("enabled = true\n")
+ f.write("reserved_space = 78Gb\n")
+ f.close()
c = client.Client(basedir)
- self.failUnlessEqual(c.getServiceNamed("storage").sizelimit,
+ self.failUnlessEqual(c.getServiceNamed("storage").reserved_space,
78*1000*1000*1000)
- def test_sizelimit_bad(self):
- basedir = "client.Basic.test_sizelimit_bad"
+ def test_reserved_bad(self):
+ basedir = "client.Basic.test_reserved_bad"
os.mkdir(basedir)
- open(os.path.join(basedir, "introducer.furl"), "w").write("")
- open(os.path.join(basedir, "vdrive.furl"), "w").write("")
- open(os.path.join(basedir, "sizelimit"), "w").write("bogus")
+ f = open(os.path.join(basedir, "tahoe.cfg"), "w")
+ f.write(self.BASECONFIG)
+ f.write("[storage]\n")
+ f.write("enabled = true\n")
+ f.write("reserved_space = bogus\n")
+ f.close()
c = client.Client(basedir)
- self.failUnlessEqual(c.getServiceNamed("storage").sizelimit, None)
+ self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0)
def _permute(self, c, key):
return [ peerid
return self._do_test_readwrite("test_readwrite_v2",
0x44, WriteBucketProxy_v2, ReadBucketProxy)
+class FakeDiskStorageServer(StorageServer):
+ def stat_disk(self, d):
+ return self.DISKAVAIL
+
class Server(unittest.TestCase):
def setUp(self):
basedir = os.path.join("storage", "Server", name)
return basedir
- def create(self, name, sizelimit=None):
+ def create(self, name, reserved_space=0, klass=StorageServer):
workdir = self.workdir(name)
- ss = StorageServer(workdir, sizelimit,
- stats_provider=FakeStatsProvider())
+ ss = klass(workdir, reserved_space=reserved_space,
+ stats_provider=FakeStatsProvider())
ss.setNodeID("\x00" * 20)
ss.setServiceParent(self.sparent)
return ss
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
- def test_sizelimits(self):
- ss = self.create("test_sizelimits", 5000)
+ def test_reserved_space(self):
+ ss = self.create("test_reserved_space", reserved_space=10000,
+ klass=FakeDiskStorageServer)
+ # the FakeDiskStorageServer doesn't do real statvfs() calls
+ ss.DISKAVAIL = 15000
+ # 15k available, 10k reserved, leaves 5k for shares
+
# a newly created and filled share incurs this much overhead, beyond
# the size we request.
OVERHEAD = 3*4
self.failUnlessEqual(len(ss._active_writers), 0)
allocated = 1001 + OVERHEAD + LEASE_SIZE
+
+ # we have to manually increase DISKAVAIL, since we're not doing real
+ # disk measurements
+ ss.DISKAVAIL -= allocated
+
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
# 5000-1085=3915 free, therefore we can fit 39 100byte shares
already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
ss.disownServiceParent()
del ss
- # creating a new StorageServer in the same directory should see the
- # same usage.
-
- # metadata that goes into the share file is counted upon share close,
- # as well as at startup. metadata that goes into other files will not
- # be counted until the next startup, so if we were creating any
- # extra-file metadata, the allocation would be more than 'allocated'
- # and this test would need to be changed.
- ss = self.create("test_sizelimits", 5000)
- already4,writers4 = self.allocate(ss, "vid4", range(100), 100, canary)
- self.failUnlessEqual(len(writers4), 39)
- self.failUnlessEqual(len(ss._active_writers), 39)
-
def test_seek(self):
basedir = self.workdir("test_seek_behavior")
fileutil.make_dirs(basedir)
self.failUnlessEqual(already, set())
self.failUnlessEqual(writers, {})
+ stats = ss.get_stats()
+ self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
+ False)
+ self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
+
def test_discard(self):
# discard is really only used for other tests, but we test it anyways
workdir = self.workdir("test_discard")
basedir = os.path.join("storage", "MutableServer", name)
return basedir
- def create(self, name, sizelimit=None):
+ def create(self, name):
workdir = self.workdir(name)
- ss = StorageServer(workdir, sizelimit)
+ ss = StorageServer(workdir)
ss.setServiceParent(self.sparent)
ss.setNodeID("\x00" * 20)
return ss
self.failUnlessEqual(a.expiration_time, b.expiration_time)
def test_leases(self):
- ss = self.create("test_leases", sizelimit=1000*1000)
+ ss = self.create("test_leases")
def secrets(n):
return ( self.write_enabler("we1"),
self.renew_secret("we1-%d" % n),
basedir = os.path.join("storage", "Server", name)
return basedir
- def create(self, name, sizelimit=None):
+ def create(self, name):
workdir = self.workdir(name)
- ss = StorageServer(workdir, sizelimit)
+ ss = StorageServer(workdir)
ss.setNodeID("\x00" * 20)
ss.setServiceParent(self.sparent)
return ss
add_to_sparent=True))
def _added(extra_node):
self.extra_node = extra_node
- extra_node.getServiceNamed("storage").sizelimit = 0
d.addCallback(_added)
HELPER_DATA = "Data that needs help to upload" * 1000
ss = client.getServiceNamed("storage")
allocated_s = abbreviate_size(ss.allocated_size())
allocated = "about %s allocated" % allocated_s
- sizelimit = "no size limit"
- if ss.sizelimit is not None:
- sizelimit = "size limit is %s" % abbreviate_size(ss.sizelimit)
- ul[T.li["Storage Server: %s, %s" % (allocated, sizelimit)]]
+ reserved = "%s reserved" % abbreviate_size(ss.reserved_space)
+ ul[T.li["Storage Server: %s, %s" % (allocated, reserved)]]
except KeyError:
ul[T.li["Not running storage server"]]