If a verify=true argument is provided, the node will perform a more
intensive check, downloading and verifying every single bit of every share.
+ If an add-lease=true argument is provided, the node will also add (or
+ renew) a lease to every share it encounters. Each lease will keep the share
+ alive for a certain period of time (one month by default). Once the last
+ lease expires or is explicitly cancelled, the storage server is allowed to
+ delete the share.
+
If an output=JSON argument is provided, the response will be
machine-readable JSON instead of human-oriented HTML. The data is a
dictionary with the following keys:
BAD_REQUEST) will be signalled if it is invoked on a file. The recursive
walker will deal with loops safely.
- This accepts the same verify= argument as t=check.
+ This accepts the same verify= and add-lease= arguments as t=check.
Since this operation can take a long time (perhaps a second per object),
the ophandle= argument is required (see "Slow Operations, Progress, and
or corrupted), it will perform a "repair". During repair, any missing
shares will be regenerated and uploaded to new servers.
- This accepts the same verify=true argument as t=check. When an output=JSON
- argument is provided, the machine-readable JSON response will contain the
- following keys:
+ This accepts the same verify=true and add-lease= arguments as t=check. When
+ an output=JSON argument is provided, the machine-readable JSON response
+ will contain the following keys:
storage-index: a base32-encoded string with the objects's storage index,
or an empty string for LIT files
invoked on a directory. An error (400 BAD_REQUEST) will be signalled if it
is invoked on a file. The recursive walker will deal with loops safely.
- This accepts the same verify=true argument as t=start-deep-check. It uses
- the same ophandle= mechanism as start-deep-check. When an output=JSON
- argument is provided, the response will contain the following keys:
+ This accepts the same verify= and add-lease= arguments as
+ t=start-deep-check. It uses the same ophandle= mechanism as
+ start-deep-check. When an output=JSON argument is provided, the response
+ will contain the following keys:
finished: (bool) True if the operation has completed, else False
root-storage-index: a base32-encoded string with the storage index of the
def get_storage_index(self):
return self._uri._filenode_uri.storage_index
- def check(self, monitor, verify=False):
+ def check(self, monitor, verify=False, add_lease=False):
"""Perform a file check. See IChecker.check for details."""
- return self._node.check(monitor, verify)
- def check_and_repair(self, monitor, verify=False):
- return self._node.check_and_repair(monitor, verify)
+ return self._node.check(monitor, verify, add_lease)
+ def check_and_repair(self, monitor, verify=False, add_lease=False):
+ return self._node.check_and_repair(monitor, verify, add_lease)
def list(self):
"""I return a Deferred that fires with a dictionary mapping child
# children for which we've got both a write-cap and a read-cap
return self.deep_traverse(DeepStats(self))
- def start_deep_check(self, verify=False):
- return self.deep_traverse(DeepChecker(self, verify, repair=False))
+ def start_deep_check(self, verify=False, add_lease=False):
+ return self.deep_traverse(DeepChecker(self, verify, repair=False, add_lease=add_lease))
- def start_deep_check_and_repair(self, verify=False):
- return self.deep_traverse(DeepChecker(self, verify, repair=True))
+ def start_deep_check_and_repair(self, verify=False, add_lease=False):
+ return self.deep_traverse(DeepChecker(self, verify, repair=True, add_lease=add_lease))
class DeepChecker:
- def __init__(self, root, verify, repair):
+ def __init__(self, root, verify, repair, add_lease):
root_si = root.get_storage_index()
self._lp = log.msg(format="deep-check starting (%(si)s),"
" verify=%(verify)s, repair=%(repair)s",
si=base32.b2a(root_si), verify=verify, repair=repair)
self._verify = verify
self._repair = repair
+ self._add_lease = add_lease
if repair:
self._results = DeepCheckAndRepairResults(root_si)
else:
def add_node(self, node, childpath):
if self._repair:
- d = node.check_and_repair(self.monitor, self._verify)
+ d = node.check_and_repair(self.monitor, self._verify, self._add_lease)
d.addCallback(self._results.add_check_and_repair, childpath)
else:
- d = node.check(self.monitor, self._verify)
+ d = node.check(self.monitor, self._verify, self._add_lease)
d.addCallback(self._results.add_check, childpath)
d.addCallback(lambda ignored: self._stats.add_node(node, childpath))
return d
from foolscap import DeadReferenceError
+from twisted.internet import defer
from allmydata import hashtree
from allmydata.check_results import CheckResults
from allmydata.immutable import download
from allmydata.uri import CHKFileVerifierURI
from allmydata.util.assertutil import precondition
from allmydata.util import base32, deferredutil, dictutil, log, rrefutil
+from allmydata.util.hashutil import file_renewal_secret_hash, \
+ file_cancel_secret_hash, bucket_renewal_secret_hash, \
+ bucket_cancel_secret_hash
from allmydata.immutable import layout
object that was passed into my constructor whether this task has been
cancelled (by invoking its raise_if_cancelled() method).
"""
- def __init__(self, client, verifycap, servers, verify, monitor):
+ def __init__(self, client, verifycap, servers, verify, add_lease, monitor):
assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
assert precondition(isinstance(servers, (set, frozenset)), servers)
for (serverid, serverrref) in servers:
self._monitor = monitor
self._servers = servers
self._verify = verify # bool: verify what the servers claim, or not?
+ self._add_lease = add_lease
self._share_hash_tree = None
+ frs = file_renewal_secret_hash(client.get_renewal_secret(),
+ self._verifycap.storage_index)
+ self.file_renewal_secret = frs
+ fcs = file_cancel_secret_hash(client.get_cancel_secret(),
+ self._verifycap.storage_index)
+ self.file_cancel_secret = fcs
+
+ def _get_renewal_secret(self, peerid):
+ return bucket_renewal_secret_hash(self.file_renewal_secret, peerid)
+ def _get_cancel_secret(self, peerid):
+ return bucket_cancel_secret_hash(self.file_cancel_secret, peerid)
+
def _get_buckets(self, server, storageindex, serverid):
"""Return a deferred that eventually fires with ({sharenum: bucket},
serverid, success). In case the server is disconnected or returns a
responded.)"""
d = server.callRemote("get_buckets", storageindex)
+ if self._add_lease:
+ renew_secret = self._get_renewal_secret(serverid)
+ cancel_secret = self._get_cancel_secret(serverid)
+ d2 = server.callRemote("add_lease", storageindex,
+ renew_secret, cancel_secret)
+ dl = defer.DeferredList([d, d2])
+ def _done(res):
+ [(get_success, get_result),
+ (addlease_success, addlease_result)] = res
+ if (not addlease_success and
+ not addlease_result.check(IndexError)):
+ # tahoe=1.3.0 raised IndexError on non-existant buckets,
+ # which we ignore. But report others, including the
+ # unfortunate internal KeyError bug that <1.3.0 had.
+ return addlease_result # propagate error
+ return get_result
+ dl.addCallback(_done)
+ d = dl
def _wrap_results(res):
for k in res:
def get_storage_index(self):
return self.u.storage_index
- def check_and_repair(self, monitor, verify=False):
+ def check_and_repair(self, monitor, verify=False, add_lease=False):
verifycap = self.get_verify_cap()
servers = self._client.get_servers("storage")
- c = Checker(client=self._client, verifycap=verifycap, servers=servers, verify=verify, monitor=monitor)
+ c = Checker(client=self._client, verifycap=verifycap, servers=servers,
+ verify=verify, add_lease=add_lease, monitor=monitor)
d = c.start()
def _maybe_repair(cr):
crr = CheckAndRepairResults(self.u.storage_index)
d.addCallback(_maybe_repair)
return d
- def check(self, monitor, verify=False):
- v = Checker(client=self._client, verifycap=self.get_verify_cap(), servers=self._client.get_servers("storage"), verify=verify, monitor=monitor)
+ def check(self, monitor, verify=False, add_lease=False):
+ v = Checker(client=self._client, verifycap=self.get_verify_cap(),
+ servers=self._client.get_servers("storage"),
+ verify=verify, add_lease=add_lease, monitor=monitor)
return v.start()
def read(self, consumer, offset=0, size=None):
def get_storage_index(self):
return None
- def check(self, monitor, verify=False):
+ def check(self, monitor, verify=False, add_lease=False):
return defer.succeed(None)
- def check_and_repair(self, monitor, verify=False):
+ def check_and_repair(self, monitor, verify=False, add_lease=False):
return defer.succeed(None)
def read(self, consumer, offset=0, size=None):
"""TODO: how should this work?"""
class ICheckable(Interface):
- def check(monitor, verify=False):
+ def check(monitor, verify=False, add_lease=False):
"""Check upon my health, optionally repairing any problems.
This returns a Deferred that fires with an instance that provides
failures during retrieval, or is malicious or buggy, then
verification will detect the problem, but checking will not.
+ If add_lease=True, I will ensure that an up-to-date lease is present
+ on each share. The lease secrets will be derived from by node secret
+ (in BASEDIR/private/secret), so either I will add a new lease to the
+ share, or I will merely renew the lease that I already had. In a
+ future version of the storage-server protocol (once Accounting has
+ been implemented), there may be additional options here to define the
+ kind of lease that is obtained (which account number to claim, etc).
+
TODO: any problems seen during checking will be reported to the
health-manager.furl, a centralized object which is responsible for
figuring out why files are unhealthy so corrective action can be
taken.
"""
- def check_and_repair(monitor, verify=False):
+ def check_and_repair(monitor, verify=False, add_lease=False):
"""Like check(), but if the file/directory is not healthy, attempt to
repair the damage.
ICheckAndRepairResults."""
class IDeepCheckable(Interface):
- def start_deep_check(verify=False):
+ def start_deep_check(verify=False, add_lease=False):
"""Check upon the health of me and everything I can reach.
This is a recursive form of check(), useable only on dirnodes.
object.
"""
- def start_deep_check_and_repair(verify=False):
+ def start_deep_check_and_repair(verify=False, add_lease=False):
"""Check upon the health of me and everything I can reach. Repair
anything that isn't healthy.
self.need_repair = False
self.responded = set() # set of (binary) nodeids
- def check(self, verify=False):
+ def check(self, verify=False, add_lease=False):
servermap = ServerMap()
- u = ServermapUpdater(self._node, self._monitor, servermap, MODE_CHECK)
+ u = ServermapUpdater(self._node, self._monitor, servermap, MODE_CHECK,
+ add_lease=add_lease)
history = self._node._client.get_history()
if history:
history.notify_mapupdate(u.get_status())
self.cr_results.pre_repair_results = self.results
self.need_repair = False
- def check(self, verify=False):
- d = MutableChecker.check(self, verify)
+ def check(self, verify=False, add_lease=False):
+ d = MutableChecker.check(self, verify, add_lease)
d.addCallback(self._maybe_repair)
d.addCallback(lambda res: self.cr_results)
return d
#################################
# ICheckable
- def check(self, monitor, verify=False):
+ def check(self, monitor, verify=False, add_lease=False):
checker = self.checker_class(self, monitor)
- return checker.check(verify)
+ return checker.check(verify, add_lease)
- def check_and_repair(self, monitor, verify=False):
+ def check_and_repair(self, monitor, verify=False, add_lease=False):
checker = self.check_and_repairer_class(self, monitor)
- return checker.check(verify)
+ return checker.check(verify, add_lease)
#################################
# IRepairable
class ServermapUpdater:
- def __init__(self, filenode, monitor, servermap, mode=MODE_READ):
+ def __init__(self, filenode, monitor, servermap, mode=MODE_READ,
+ add_lease=False):
"""I update a servermap, locating a sufficient number of useful
shares and remembering where they are located.
self._monitor = monitor
self._servermap = servermap
self.mode = mode
+ self._add_lease = add_lease
self._running = True
self._storage_index = filenode.get_storage_index()
def _do_read(self, ss, peerid, storage_index, shnums, readv):
d = ss.callRemote("slot_readv", storage_index, shnums, readv)
+ if self._add_lease:
+ renew_secret = self._node.get_renewal_secret(peerid)
+ cancel_secret = self._node.get_cancel_secret(peerid)
+ d2 = ss.callRemote("add_lease", storage_index,
+ renew_secret, cancel_secret)
+ dl = defer.DeferredList([d, d2])
+ def _done(res):
+ [(readv_success, readv_result),
+ (addlease_success, addlease_result)] = res
+ if (not addlease_success and
+ not addlease_result.check(IndexError)):
+ # tahoe 1.3.0 raised IndexError on non-existant buckets,
+ # which we ignore. Unfortunately tahoe <1.3.0 had a bug
+ # and raised KeyError, which we report.
+ return addlease_result # propagate error
+ return readv_result
+ dl.addCallback(_done)
+ return dl
return d
def _got_results(self, datavs, peerid, readsize, stuff, started):
("raw", None, "Display raw JSON data instead of parsed"),
("verify", None, "Verify all hashes, instead of merely querying share presence"),
("repair", None, "Automatically repair any problems found"),
+ ("add-lease", None, "Add/renew lease on all shares"),
]
def parseArgs(self, where=''):
self.where = where
("raw", None, "Display raw JSON data instead of parsed"),
("verify", None, "Verify all hashes, instead of merely querying share presence"),
("repair", None, "Automatically repair any problems found"),
+ ("add-lease", None, "Add/renew lease on all shares"),
("verbose", "v", "Be noisy about what is happening."),
]
def parseArgs(self, where=''):
url += "&verify=true"
if options["repair"]:
url += "&repair=true"
+ if options["add-lease"]:
+ url += "&add-lease=true"
resp = do_http("POST", url)
if resp.status != 200:
output = DeepCheckAndRepairOutput(options)
else:
output = DeepCheckOutput(options)
+ if options["add-lease"]:
+ url += "&add-lease=true"
resp = do_http("POST", url)
if resp.status not in (200, 302):
print >>stderr, "ERROR", resp.status, resp.reason, resp.read()
def get_storage_index(self):
return self.storage_index
- def check(self, monitor, verify=False):
+ def check(self, monitor, verify=False, add_lease=False):
r = CheckResults(self.my_uri, self.storage_index)
is_bad = self.bad_shares.get(self.storage_index, None)
data = {}
r.set_data(data)
r.set_needs_rebalancing(False)
return defer.succeed(r)
- def check_and_repair(self, monitor, verify=False):
+ def check_and_repair(self, monitor, verify=False, add_lease=False):
d = self.check(verify)
def _got(cr):
r = CheckAndRepairResults(self.storage_index)
def get_storage_index(self):
return self.storage_index
- def check(self, monitor, verify=False):
+ def check(self, monitor, verify=False, add_lease=False):
r = CheckResults(self.my_uri, self.storage_index)
is_bad = self.bad_shares.get(self.storage_index, None)
data = {}
r.set_needs_rebalancing(False)
return defer.succeed(r)
- def check_and_repair(self, monitor, verify=False):
+ def check_and_repair(self, monitor, verify=False, add_lease=False):
d = self.check(verify)
def _got(cr):
r = CheckAndRepairResults(self.storage_index)
d.addCallback(_got)
return d
- def deep_check(self, verify=False):
+ def deep_check(self, verify=False, add_lease=False):
d = self.check(verify)
def _done(r):
dr = DeepCheckResults(self.storage_index)
d.addCallback(_done)
return d
- def deep_check_and_repair(self, verify=False):
+ def deep_check_and_repair(self, verify=False, add_lease=False):
d = self.check_and_repair(verify)
def _done(r):
dr = DeepCheckAndRepairResults(self.storage_index)
def tearDown(self):
return self.s.stopService()
- def set_up_grid(self, client_config_hooks={}):
+ def set_up_grid(self, num_clients=1, client_config_hooks={}):
# self.basedir must be set
- self.g = NoNetworkGrid(self.basedir,
+ self.g = NoNetworkGrid(self.basedir, num_clients=num_clients,
client_config_hooks=client_config_hooks)
self.g.setServiceParent(self.s)
self.client_webports = [c.getServiceNamed("webish").listener._port.getHost().port
def get_storage_index(self):
return self.storage_index
- def check(self, monitor, verify=False):
+ def check(self, monitor, verify=False, add_lease=False):
r = CheckResults(uri.from_string(self.nodeuri), None)
r.set_healthy(True)
r.set_recoverable(True)
return defer.succeed(r)
- def check_and_repair(self, monitor, verify=False):
+ def check_and_repair(self, monitor, verify=False, add_lease=False):
d = self.check(verify)
def _got(cr):
r = CheckAndRepairResults(None)
from twisted.internet import defer, reactor
from twisted.web import client, error, http
from twisted.python import failure, log
-from allmydata import interfaces, uri, webish
+from allmydata import interfaces, uri, webish, storage
from allmydata.immutable import upload, download
from allmydata.web import status, common
from allmydata.scripts.debug import CorruptShareOptions, corrupt_share
class Grid(GridTestMixin, WebErrorMixin, unittest.TestCase):
def GET(self, urlpath, followRedirect=False, return_response=False,
- method="GET", **kwargs):
+ method="GET", clientnum=0, **kwargs):
# if return_response=True, this fires with (data, statuscode,
# respheaders) instead of just data.
assert not isinstance(urlpath, unicode)
- url = self.client_baseurls[0] + urlpath
+ url = self.client_baseurls[clientnum] + urlpath
factory = HTTPClientGETFactory(url, method=method,
followRedirect=followRedirect, **kwargs)
- reactor.connectTCP("localhost", self.client_webports[0], factory)
+ reactor.connectTCP("localhost", self.client_webports[clientnum],factory)
d = factory.deferred
def _got_data(data):
return (data, factory.status, factory.response_headers)
d.addCallback(_got_data)
return factory.deferred
- def CHECK(self, ign, which, args):
+ def CHECK(self, ign, which, args, clientnum=0):
fileurl = self.fileurls[which]
url = fileurl + "?" + args
- return self.GET(url, method="POST")
+ return self.GET(url, method="POST", clientnum=clientnum)
def test_filecheck(self):
self.basedir = "web/Grid/filecheck"
d.addErrback(self.explain_web_error)
return d
+
+ def _count_leases(self, ignored, which):
+ u = self.uris[which]
+ shares = self.find_shares(u)
+ lease_counts = []
+ for shnum, serverid, fn in shares:
+ if u.startswith("URI:SSK") or u.startswith("URI:DIR2"):
+ sf = storage.MutableShareFile(fn)
+ num_leases = len(sf.debug_get_leases())
+ elif u.startswith("URI:CHK"):
+ sf = storage.ShareFile(fn)
+ num_leases = len(list(sf.iter_leases()))
+ else:
+ raise RuntimeError("can't get leases on %s" % u)
+ lease_counts.append( (fn, num_leases) )
+ return lease_counts
+
+ def _assert_leasecount(self, lease_counts, expected):
+ for (fn, num_leases) in lease_counts:
+ if num_leases != expected:
+ self.fail("expected %d leases, have %d, on %s" %
+ (expected, num_leases, fn))
+
+ def test_add_lease(self):
+ self.basedir = "web/Grid/add_lease"
+ self.set_up_grid(num_clients=2)
+ c0 = self.g.clients[0]
+ self.uris = {}
+ DATA = "data" * 100
+ d = c0.upload(upload.Data(DATA, convergence=""))
+ def _stash_uri(ur, which):
+ self.uris[which] = ur.uri
+ d.addCallback(_stash_uri, "one")
+ d.addCallback(lambda ign:
+ c0.upload(upload.Data(DATA+"1", convergence="")))
+ d.addCallback(_stash_uri, "two")
+ def _stash_mutable_uri(n, which):
+ self.uris[which] = n.get_uri()
+ assert isinstance(self.uris[which], str)
+ d.addCallback(lambda ign: c0.create_mutable_file(DATA+"2"))
+ d.addCallback(_stash_mutable_uri, "mutable")
+
+ def _compute_fileurls(ignored):
+ self.fileurls = {}
+ for which in self.uris:
+ self.fileurls[which] = "uri/" + urllib.quote(self.uris[which])
+ d.addCallback(_compute_fileurls)
+
+ d.addCallback(self._count_leases, "one")
+ d.addCallback(self._assert_leasecount, 1)
+ d.addCallback(self._count_leases, "two")
+ d.addCallback(self._assert_leasecount, 1)
+ d.addCallback(self._count_leases, "mutable")
+ d.addCallback(self._assert_leasecount, 1)
+
+ d.addCallback(self.CHECK, "one", "t=check") # no add-lease
+ def _got_html_good(res):
+ self.failUnless("Healthy" in res, res)
+ self.failIf("Not Healthy" in res, res)
+ d.addCallback(_got_html_good)
+
+ d.addCallback(self._count_leases, "one")
+ d.addCallback(self._assert_leasecount, 1)
+ d.addCallback(self._count_leases, "two")
+ d.addCallback(self._assert_leasecount, 1)
+ d.addCallback(self._count_leases, "mutable")
+ d.addCallback(self._assert_leasecount, 1)
+
+ # this CHECK uses the original client, which uses the same
+ # lease-secrets, so it will just renew the original lease
+ d.addCallback(self.CHECK, "one", "t=check&add-lease=true")
+ d.addCallback(_got_html_good)
+
+ d.addCallback(self._count_leases, "one")
+ d.addCallback(self._assert_leasecount, 1)
+ d.addCallback(self._count_leases, "two")
+ d.addCallback(self._assert_leasecount, 1)
+ d.addCallback(self._count_leases, "mutable")
+ d.addCallback(self._assert_leasecount, 1)
+
+ # this CHECK uses an alternate client, which adds a second lease
+ d.addCallback(self.CHECK, "one", "t=check&add-lease=true", clientnum=1)
+ d.addCallback(_got_html_good)
+
+ d.addCallback(self._count_leases, "one")
+ d.addCallback(self._assert_leasecount, 2)
+ d.addCallback(self._count_leases, "two")
+ d.addCallback(self._assert_leasecount, 1)
+ d.addCallback(self._count_leases, "mutable")
+ d.addCallback(self._assert_leasecount, 1)
+
+ d.addCallback(self.CHECK, "mutable", "t=check&add-lease=true")
+ d.addCallback(_got_html_good)
+
+ d.addCallback(self._count_leases, "one")
+ d.addCallback(self._assert_leasecount, 2)
+ d.addCallback(self._count_leases, "two")
+ d.addCallback(self._assert_leasecount, 1)
+ d.addCallback(self._count_leases, "mutable")
+ d.addCallback(self._assert_leasecount, 1)
+
+ d.addCallback(self.CHECK, "mutable", "t=check&add-lease=true",
+ clientnum=1)
+ d.addCallback(_got_html_good)
+
+ d.addCallback(self._count_leases, "one")
+ d.addCallback(self._assert_leasecount, 2)
+ d.addCallback(self._count_leases, "two")
+ d.addCallback(self._assert_leasecount, 1)
+ d.addCallback(self._count_leases, "mutable")
+ d.addCallback(self._assert_leasecount, 2)
+
+ d.addErrback(self.explain_web_error)
+ return d
+
+ def test_deep_add_lease(self):
+ self.basedir = "web/Grid/deep_add_lease"
+ self.set_up_grid(num_clients=2)
+ c0 = self.g.clients[0]
+ self.uris = {}
+ self.fileurls = {}
+ DATA = "data" * 100
+ d = c0.create_empty_dirnode()
+ def _stash_root_and_create_file(n):
+ self.rootnode = n
+ self.uris["root"] = n.get_uri()
+ self.fileurls["root"] = "uri/" + urllib.quote(n.get_uri()) + "/"
+ return n.add_file(u"one", upload.Data(DATA, convergence=""))
+ d.addCallback(_stash_root_and_create_file)
+ def _stash_uri(fn, which):
+ self.uris[which] = fn.get_uri()
+ d.addCallback(_stash_uri, "one")
+ d.addCallback(lambda ign:
+ self.rootnode.add_file(u"small",
+ upload.Data("literal",
+ convergence="")))
+ d.addCallback(_stash_uri, "small")
+
+ d.addCallback(lambda ign: c0.create_mutable_file("mutable"))
+ d.addCallback(lambda fn: self.rootnode.set_node(u"mutable", fn))
+ d.addCallback(_stash_uri, "mutable")
+
+ d.addCallback(self.CHECK, "root", "t=stream-deep-check") # no add-lease
+ def _done(res):
+ units = [simplejson.loads(line)
+ for line in res.splitlines()
+ if line]
+ # root, one, small, mutable, stats
+ self.failUnlessEqual(len(units), 4+1)
+ d.addCallback(_done)
+
+ d.addCallback(self._count_leases, "root")
+ d.addCallback(self._assert_leasecount, 1)
+ d.addCallback(self._count_leases, "one")
+ d.addCallback(self._assert_leasecount, 1)
+ d.addCallback(self._count_leases, "mutable")
+ d.addCallback(self._assert_leasecount, 1)
+
+ d.addCallback(self.CHECK, "root", "t=stream-deep-check&add-lease=true")
+ d.addCallback(_done)
+
+ d.addCallback(self._count_leases, "root")
+ d.addCallback(self._assert_leasecount, 1)
+ d.addCallback(self._count_leases, "one")
+ d.addCallback(self._assert_leasecount, 1)
+ d.addCallback(self._count_leases, "mutable")
+ d.addCallback(self._assert_leasecount, 1)
+
+ d.addCallback(self.CHECK, "root", "t=stream-deep-check&add-lease=true",
+ clientnum=1)
+ d.addCallback(_done)
+
+ d.addCallback(self._count_leases, "root")
+ d.addCallback(self._assert_leasecount, 2)
+ d.addCallback(self._count_leases, "one")
+ d.addCallback(self._assert_leasecount, 2)
+ d.addCallback(self._count_leases, "mutable")
+ d.addCallback(self._assert_leasecount, 2)
+
+ d.addErrback(self.explain_web_error)
+ return d
# check this directory
verify = boolean_of_arg(get_arg(req, "verify", "false"))
repair = boolean_of_arg(get_arg(req, "repair", "false"))
+ add_lease = boolean_of_arg(get_arg(req, "add-lease", "false"))
if repair:
- d = self.node.check_and_repair(Monitor(), verify)
+ d = self.node.check_and_repair(Monitor(), verify, add_lease)
d.addCallback(lambda res: CheckAndRepairResults(res))
else:
- d = self.node.check(Monitor(), verify)
+ d = self.node.check(Monitor(), verify, add_lease)
d.addCallback(lambda res: CheckResults(res))
return d
raise NeedOperationHandleError("slow operation requires ophandle=")
verify = boolean_of_arg(get_arg(ctx, "verify", "false"))
repair = boolean_of_arg(get_arg(ctx, "repair", "false"))
+ add_lease = boolean_of_arg(get_arg(ctx, "add-lease", "false"))
if repair:
- monitor = self.node.start_deep_check_and_repair(verify)
+ monitor = self.node.start_deep_check_and_repair(verify, add_lease)
renderer = DeepCheckAndRepairResults(monitor)
else:
- monitor = self.node.start_deep_check(verify)
+ monitor = self.node.start_deep_check(verify, add_lease)
renderer = DeepCheckResults(monitor)
return self._start_operation(monitor, renderer, ctx)
def _POST_stream_deep_check(self, ctx):
verify = boolean_of_arg(get_arg(ctx, "verify", "false"))
repair = boolean_of_arg(get_arg(ctx, "repair", "false"))
- walker = DeepCheckStreamer(ctx, self.node, verify, repair)
+ add_lease = boolean_of_arg(get_arg(ctx, "add-lease", "false"))
+ walker = DeepCheckStreamer(ctx, self.node, verify, repair, add_lease)
monitor = self.node.deep_traverse(walker)
walker.setMonitor(monitor)
# register to hear stopProducing. The walker ignores pauseProducing.
class DeepCheckStreamer(dirnode.DeepStats):
implements(IPushProducer)
- def __init__(self, ctx, origin, verify, repair):
+ def __init__(self, ctx, origin, verify, repair, add_lease):
dirnode.DeepStats.__init__(self, origin)
self.req = IRequest(ctx)
self.verify = verify
self.repair = repair
+ self.add_lease = add_lease
def setMonitor(self, monitor):
self.monitor = monitor
data["storage-index"] = si
if self.repair:
- d = node.check_and_repair(self.monitor, self.verify)
+ d = node.check_and_repair(self.monitor, self.verify, self.add_lease)
d.addCallback(self.add_check_and_repair, data)
else:
- d = node.check(self.monitor, self.verify)
+ d = node.check(self.monitor, self.verify, self.add_lease)
d.addCallback(self.add_check, data)
d.addCallback(self.write_line)
return d
def _POST_check(self, req):
verify = boolean_of_arg(get_arg(req, "verify", "false"))
repair = boolean_of_arg(get_arg(req, "repair", "false"))
+ add_lease = boolean_of_arg(get_arg(req, "add-lease", "false"))
if isinstance(self.node, LiteralFileNode):
return defer.succeed(LiteralCheckResults())
if repair:
- d = self.node.check_and_repair(Monitor(), verify)
+ d = self.node.check_and_repair(Monitor(), verify, add_lease)
d.addCallback(lambda res: CheckAndRepairResults(res))
else:
- d = self.node.check(Monitor(), verify)
+ d = self.node.check(Monitor(), verify, add_lease)
d.addCallback(lambda res: CheckResults(res))
return d