# do not import any allmydata modules at this level. Do that from inside
# individual functions instead.
import struct, time, os
-from twisted.python import usage
+from twisted.python import usage, failure
+from twisted.internet import defer
class DumpOptions(usage.Options):
def getSynopsis(self):
"""
return t
+def call(c, *args, **kwargs):
+ # take advantage of the fact that ImmediateReadBucketProxy returns
+ # Deferreds that are already fired
+ results = []
+ d = defer.maybeDeferred(c, *args, **kwargs)
+ d.addCallback(results.append)
+ return results[0]
+
def describe_share(abs_sharefile, si_s, shnum_s, now, out):
from allmydata import uri, storage
from allmydata.mutable.layout import unpack_share
elif struct.unpack(">L", prefix[:4]) == (1,):
# immutable
- sf = storage.ShareFile(abs_sharefile)
+
+ class ImmediateReadBucketProxy(ReadBucketProxy):
+ def __init__(self, sf):
+ self.sf = sf
+ def __repr__(self):
+ return "<ImmediateReadBucketProxy>"
+ def _read(self, offset, size):
+ return defer.succeed(sf.read_share_data(offset, size))
+
# use a ReadBucketProxy to parse the bucket and find the uri extension
- bp = ReadBucketProxy(None)
- offsets = bp._parse_offsets(sf.read_share_data(0, 0x24))
- seek = offsets['uri_extension']
- length = struct.unpack(">L", sf.read_share_data(seek, 4))[0]
- seek += 4
- UEB_data = sf.read_share_data(seek, length)
+ sf = storage.ShareFile(abs_sharefile)
+ bp = ImmediateReadBucketProxy(sf)
+ bp.start()
+
expiration_time = min( [lease.expiration_time
for lease in sf.iter_leases()] )
expiration = max(0, expiration_time - now)
+ UEB_data = call(bp.get_uri_extension)
unpacked = uri.unpack_extension_readable(UEB_data)
+
k = unpacked["needed_shares"]
N = unpacked["total_shares"]
filesize = unpacked["size"]
def catalog_shares(options):
out = options.stdout
+ err = options.stderr
now = time.time()
for d in options.nodedirs:
d = os.path.join(os.path.expanduser(d), "storage/shares")
abs_sharefile = os.path.join(si_dir, shnum_s)
abs_sharefile = os.path.abspath(abs_sharefile)
assert os.path.isfile(abs_sharefile)
- describe_share(abs_sharefile, si_s, shnum_s, now, out)
+ try:
+ describe_share(abs_sharefile, si_s, shnum_s, now,
+ out)
+ except:
+ print >>err, "Error processing %s" % abs_sharefile
+ failure.Failure().printTraceback(err)
return 0
class CorruptShareOptions(usage.Options):
self.failUnless("storage index: nt4fwemuw7flestsezvo2eveke" in output, output)
self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output)
+ def _catalog_shares(self, *basedirs):
+ o = debug.CatalogSharesOptions()
+ o.stdout,o.stderr = StringIO(), StringIO()
+ args = list(basedirs)
+ o.parseOptions(args)
+ debug.catalog_shares(o)
+ out = o.stdout.getvalue()
+ err = o.stderr.getvalue()
+ return out, err
+
+ def test_catalog_shares_error(self):
+ nodedir1 = "cli/test_catalog_shares/node1"
+ sharedir = os.path.join(nodedir1, "storage", "shares", "mq", "mqfblse6m5a6dh45isu2cg7oji")
+ fileutil.make_dirs(sharedir)
+ f = open(os.path.join(sharedir, "8"), "wb")
+ # write a bogus share that looks a little bit like CHK
+ f.write("\x00\x00\x00\x01" + "\xff" * 200) # this triggers an assert
+ f.close()
+
+ nodedir2 = "cli/test_catalog_shares/node2"
+ fileutil.make_dirs(nodedir2)
+
+ # now make sure that the 'catalog-shares' commands survives the error
+ out, err = self._catalog_shares(nodedir1, nodedir2)
+ self.failUnlessEqual(out, "", out)
+ self.failUnless("Error processing " in err, err)
+ self.failUnless(nodedir1 in err, err)
+ self.flushLoggedErrors(AssertionError)
+
+
class CLITestMixin:
def do_cli(self, verb, *args, **kwargs):
nodeargs = [