]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
catalog-shares command: tolerate errors, log them to stderr, handle v2-immutable...
authorBrian Warner <warner@allmydata.com>
Wed, 29 Oct 2008 22:10:10 +0000 (15:10 -0700)
committerBrian Warner <warner@allmydata.com>
Wed, 29 Oct 2008 22:10:10 +0000 (15:10 -0700)
src/allmydata/scripts/debug.py
src/allmydata/test/test_cli.py

index eac5ea1d58da83f57ef3d60bbe8d51ed94370049..29e3e43fe9541c5d2ca97cef9b3090714f3afc1d 100644 (file)
@@ -2,7 +2,8 @@
 # do not import any allmydata modules at this level. Do that from inside
 # individual functions instead.
 import struct, time, os
-from twisted.python import usage
+from twisted.python import usage, failure
+from twisted.internet import defer
 
 class DumpOptions(usage.Options):
     def getSynopsis(self):
@@ -515,6 +516,14 @@ useful for purpose.
 """
         return t
 
+def call(c, *args, **kwargs):
+    # take advantage of the fact that ImmediateReadBucketProxy returns
+    # Deferreds that are already fired
+    results = []
+    d = defer.maybeDeferred(c, *args, **kwargs)
+    d.addCallback(results.append)
+    return results[0]
+
 def describe_share(abs_sharefile, si_s, shnum_s, now, out):
     from allmydata import uri, storage
     from allmydata.mutable.layout import unpack_share
@@ -571,19 +580,27 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
     elif struct.unpack(">L", prefix[:4]) == (1,):
         # immutable
 
-        sf = storage.ShareFile(abs_sharefile)
+
+        class ImmediateReadBucketProxy(ReadBucketProxy):
+            def __init__(self, sf):
+                self.sf = sf
+            def __repr__(self):
+                return "<ImmediateReadBucketProxy>"
+            def _read(self, offset, size):
+                return defer.succeed(sf.read_share_data(offset, size))
+
         # use a ReadBucketProxy to parse the bucket and find the uri extension
-        bp = ReadBucketProxy(None)
-        offsets = bp._parse_offsets(sf.read_share_data(0, 0x24))
-        seek = offsets['uri_extension']
-        length = struct.unpack(">L", sf.read_share_data(seek, 4))[0]
-        seek += 4
-        UEB_data = sf.read_share_data(seek, length)
+        sf = storage.ShareFile(abs_sharefile)
+        bp = ImmediateReadBucketProxy(sf)
+        bp.start()
+
         expiration_time = min( [lease.expiration_time
                                 for lease in sf.iter_leases()] )
         expiration = max(0, expiration_time - now)
 
+        UEB_data = call(bp.get_uri_extension)
         unpacked = uri.unpack_extension_readable(UEB_data)
+
         k = unpacked["needed_shares"]
         N = unpacked["total_shares"]
         filesize = unpacked["size"]
@@ -601,6 +618,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
 
 def catalog_shares(options):
     out = options.stdout
+    err = options.stderr
     now = time.time()
     for d in options.nodedirs:
         d = os.path.join(os.path.expanduser(d), "storage/shares")
@@ -620,7 +638,12 @@ def catalog_shares(options):
                         abs_sharefile = os.path.join(si_dir, shnum_s)
                         abs_sharefile = os.path.abspath(abs_sharefile)
                         assert os.path.isfile(abs_sharefile)
-                        describe_share(abs_sharefile, si_s, shnum_s, now, out)
+                        try:
+                            describe_share(abs_sharefile, si_s, shnum_s, now,
+                                           out)
+                        except:
+                            print >>err, "Error processing %s" % abs_sharefile
+                            failure.Failure().printTraceback(err)
     return 0
 
 class CorruptShareOptions(usage.Options):
index 3f65079d8fc88e9bafaf016b90d39d7b38839e69..6f72f2505ea634c55b6e60f8d5e2431907eea267 100644 (file)
@@ -217,6 +217,36 @@ class CLI(unittest.TestCase):
         self.failUnless("storage index: nt4fwemuw7flestsezvo2eveke" in output, output)
         self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output)
 
+    def _catalog_shares(self, *basedirs):
+        o = debug.CatalogSharesOptions()
+        o.stdout,o.stderr = StringIO(), StringIO()
+        args = list(basedirs)
+        o.parseOptions(args)
+        debug.catalog_shares(o)
+        out = o.stdout.getvalue()
+        err = o.stderr.getvalue()
+        return out, err
+
+    def test_catalog_shares_error(self):
+        nodedir1 = "cli/test_catalog_shares/node1"
+        sharedir = os.path.join(nodedir1, "storage", "shares", "mq", "mqfblse6m5a6dh45isu2cg7oji")
+        fileutil.make_dirs(sharedir)
+        f = open(os.path.join(sharedir, "8"), "wb")
+        # write a bogus share that looks a little bit like CHK
+        f.write("\x00\x00\x00\x01" + "\xff" * 200) # this triggers an assert
+        f.close()
+
+        nodedir2 = "cli/test_catalog_shares/node2"
+        fileutil.make_dirs(nodedir2)
+
+        # now make sure that the 'catalog-shares' commands survives the error
+        out, err = self._catalog_shares(nodedir1, nodedir2)
+        self.failUnlessEqual(out, "", out)
+        self.failUnless("Error processing " in err, err)
+        self.failUnless(nodedir1 in err, err)
+        self.flushLoggedErrors(AssertionError)
+
+
 class CLITestMixin:
     def do_cli(self, verb, *args, **kwargs):
         nodeargs = [