pulled in assertutil and with it humanreadable from amdlib.util
authorRob Kinninmont <robk@allmydata.com>
Thu, 30 Nov 2006 22:22:25 +0000 (15:22 -0700)
committerRob Kinninmont <robk@allmydata.com>
Thu, 30 Nov 2006 22:22:25 +0000 (15:22 -0700)
amdlib/__init__.py [new file with mode: 0644]
amdlib/util/__init__.py [new file with mode: 0644]
amdlib/util/assertutil.py [new file with mode: 0644]
amdlib/util/humanreadable.py [new file with mode: 0644]

diff --git a/amdlib/__init__.py b/amdlib/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/amdlib/util/__init__.py b/amdlib/util/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/amdlib/util/assertutil.py b/amdlib/util/assertutil.py
new file mode 100644 (file)
index 0000000..368407d
--- /dev/null
@@ -0,0 +1,80 @@
+# Copyright (c) 2003-2006 Bryce "Zooko" Wilcox-O'Hearn
+# mailto:zooko@zooko.com
+# http://zooko.com/
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this work to deal in this work without restriction (including the 
+# rights to use, modify, distribute, sublicense, and/or sell copies)
+
+"""
+Tests useful in assertion checking, prints out nicely formated messages too.
+"""
+
+from humanreadable import hr
+
+import Log
+
+def _assert(___cond=False, *___args, **___kwargs):
+    if ___cond:
+        return True
+    msgbuf=[]
+    if ___args:
+        msgbuf.append("%s %s" % tuple(map(hr, (___args[0], type(___args[0]),))))
+        msgbuf.extend([", %s %s" % tuple(map(hr, (arg, type(arg),))) for arg in ___args[1:]])
+        if ___kwargs:
+            msgbuf.append(", %s: %s %s" % ((___kwargs.items()[0][0],) + tuple(map(hr, (___kwargs.items()[0][1], type(___kwargs.items()[0][1]),)))))
+    else:
+        if ___kwargs:
+            msgbuf.append("%s: %s %s" % ((___kwargs.items()[0][0],) + tuple(map(hr, (___kwargs.items()[0][1], type(___kwargs.items()[0][1]),)))))
+    msgbuf.extend([", %s: %s %s" % tuple(map(hr, (k, v, type(v),))) for k, v in ___kwargs.items()[1:]])
+
+    raise AssertionError, "".join(msgbuf)
+
+    return False
+
+def precondition(___cond=False, *___args, **___kwargs):
+    try:
+        if ___cond:
+            return True
+        msgbuf=["precondition", ]
+        if ___args or ___kwargs:
+            msgbuf.append(": ")
+        if ___args:
+            msgbuf.append("%s %s" % tuple(map(hr, (___args[0], type(___args[0]),))))
+            msgbuf.extend([", %s %s" % tuple(map(hr, (arg, type(arg),))) for arg in ___args[1:]])
+            if ___kwargs:
+                msgbuf.append(", %s: %s %s" % ((___kwargs.items()[0][0],) + tuple(map(hr, (___kwargs.items()[0][1], type(___kwargs.items()[0][1]),)))))
+        else:
+            if ___kwargs:
+                msgbuf.append("%s: %s %s" % ((___kwargs.items()[0][0],) + tuple(map(hr, (___kwargs.items()[0][1], type(___kwargs.items()[0][1]),)))))
+        msgbuf.extend([", %s: %s %s" % tuple(map(hr, (k, v, type(v),))) for k, v in ___kwargs.items()[1:]])
+    except Exception, le:
+        Log.Log(Log.ERR, 'assertutil', "INTERNAL ERROR IN pyutil.assertutil. %s %s %s" % (type(le), repr(le), le.args,))
+        raise le
+    except:
+        Log.Log(Log.ERR, 'assertutil', "INTERNAL ERROR IN pyutil.assertutil.")
+        raise
+
+    raise AssertionError, "".join(msgbuf)
+
+    return False
+
+def postcondition(___cond=False, *___args, **___kwargs):
+    if ___cond:
+        return True
+    msgbuf=["postcondition", ]
+    if ___args or ___kwargs:
+        msgbuf.append(": ")
+    if ___args:
+        msgbuf.append("%s %s" % tuple(map(hr, (___args[0], type(___args[0]),))))
+        msgbuf.extend([", %s %s" % tuple(map(hr, (arg, type(arg),))) for arg in ___args[1:]])
+        if ___kwargs:
+            msgbuf.append(", %s: %s %s" % ((___kwargs.items()[0][0],) + tuple(map(hr, (___kwargs.items()[0][1], type(___kwargs.items()[0][1]),)))))
+    else:
+        if ___kwargs:
+            msgbuf.append("%s: %s %s" % ((___kwargs.items()[0][0],) + tuple(map(hr, (___kwargs.items()[0][1], type(___kwargs.items()[0][1]),)))))
+    msgbuf.extend([", %s: %s %s" % tuple(map(hr, (k, v, type(v),))) for k, v in ___kwargs.items()[1:]])
+
+    raise AssertionError, "".join(msgbuf)
+
+    return False
+
diff --git a/amdlib/util/humanreadable.py b/amdlib/util/humanreadable.py
new file mode 100644 (file)
index 0000000..bae826c
--- /dev/null
@@ -0,0 +1,128 @@
+#  Copyright (c) 2001 Autonomous Zone Industries
+#  Copyright (c) 2002-2005 Bryce "Zooko" Wilcox-O'Hearn
+#  This file is licensed under the
+#    GNU Lesser General Public License v2.1.
+#    See the file COPYING or visit http://www.gnu.org/ for details.
+
+import exceptions, os
+from repr import Repr
+
+class BetterRepr(Repr):
+    def __init__(self):
+        Repr.__init__(self)
+
+        # Note: These levels can get adjusted dynamically!  My goal is to get
+        # more info when printing important debug stuff like exceptions and
+        # stack traces and less info when logging normal events.
+        # --Zooko 2000-10-14
+        if 0: # DEBUG MODE
+            self.maxlevel = 24
+            self.maxdict = 24
+            self.maxlist = 24
+            self.maxtuple = 24
+            self.maxstring = 800
+            self.maxother = 800
+        else:
+            self.maxlevel = 6
+            self.maxdict = 6
+            self.maxlist = 6
+            self.maxtuple = 6
+            self.maxstring = 300
+            self.maxother = 300
+
+    def repr_function(self, obj, level):
+        if hasattr(obj, 'func_code'):
+            return '<' + obj.func_name + '() at ' + os.path.basename(obj.func_code.co_filename) + ':' + str(obj.func_code.co_firstlineno) + '>'
+        else:
+            return '<' + obj.func_name + '() at (builtin)'
+
+    def repr_instance_method(self, obj, level):
+        if hasattr(obj, 'func_code'):
+            return '<' + obj.im_class.__name__ + '.' + obj.im_func.__name__ + '() at ' + os.path.basename(obj.im_func.func_code.co_filename) + ':' + str(obj.im_func.func_code.co_firstlineno) + '>'
+        else:
+            return '<' + obj.im_class.__name__ + '.' + obj.im_func.__name__ + '() at (builtin)'
+
+    def repr_long(self, obj, level):
+        s = `obj` # XXX Hope this isn't too slow...
+        if len(s) > self.maxlong:
+            i = max(0, (self.maxlong-3)/2)
+            j = max(0, self.maxlong-3-i)
+            s = s[:i] + '...' + s[len(s)-j:]
+        if s[-1] == 'L':
+            return s[:-1]
+        return s
+
+    def repr_instance(self, obj, level):
+        """
+        If it is an instance of Exception, format it nicely (trying to emulate
+        the format that you see when an exception is actually raised, plus
+        bracketing '<''s).  If it is an instance of dict call self.repr_dict()
+        on it.  If it is an instance of list call self.repr_list() on it. Else
+        call Repr.repr_instance().
+        """
+        if isinstance(obj, exceptions.Exception):
+            # Don't cut down exception strings so much.
+            tms = self.maxstring
+            self.maxstring = max(512, tms * 4)
+            tml = self.maxlist
+            self.maxlist = max(12, tml * 4)
+            try:
+                if hasattr(obj, 'args'):
+                    if len(obj.args) == 1:
+                        return '<' + obj.__class__.__name__ + ': ' + self.repr1(obj.args[0], level-1) + '>'
+                    else:
+                        return '<' + obj.__class__.__name__ + ': ' + self.repr1(obj.args, level-1) + '>'
+                else:
+                    return '<' + obj.__class__.__name__ + '>'
+            finally:
+                self.maxstring = tms
+                self.maxlist = tml
+
+        if isinstance(obj, dict):
+            return self.repr_dict(obj, level)
+
+        if isinstance(obj, list):
+            return self.repr_list(obj, level)
+
+        return Repr.repr_instance(self, obj, level)
+
+    def repr_list(self, obj, level):
+        """
+        copied from standard repr.py and fixed to work on multithreadedly mutating lists.
+        """
+        if level <= 0: return '[...]'
+        n = len(obj)
+        myl = obj[:min(n, self.maxlist)]
+        s = ''
+        for item in myl:
+            entry = self.repr1(item, level-1)
+            if s: s = s + ', '
+            s = s + entry
+        if n > self.maxlist: s = s + ', ...'
+        return '[' + s + ']'
+
+    def repr_dict(self, obj, level):
+        """
+        copied from standard repr.py and fixed to work on multithreadedly mutating dicts.
+        """
+        if level <= 0: return '{...}'
+        s = ''
+        n = len(obj)
+        items = obj.items()[:min(n, self.maxdict)]
+        items.sort()
+        for key, val in items:
+            entry = self.repr1(key, level-1) + ':' + self.repr1(val, level-1)
+            if s: s = s + ', '
+            s = s + entry
+        if n > self.maxdict: s = s + ', ...'
+        return '{' + s + '}'
+
+# This object can be changed by other code updating this module's "brepr"
+# variables.  This is so that (a) code can use humanreadable with
+# "from humanreadable import hr; hr(mything)", and (b) code can override
+# humanreadable to provide application-specific human readable output
+# (e.g. libbase32's base32id.AbbrevRepr).
+brepr = BetterRepr()
+
+def hr(x):
+    return brepr.repr(x)