]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/test/test_util.py
wui: improved columns in welcome page server list
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_util.py
index 4ebcb1b90afb93718ceca901747cca72131859d4..46fa16ee58c2465cb662fce92261b3036870e19a 100644 (file)
@@ -420,6 +420,15 @@ class FileUtil(unittest.TestCase):
         fileutil.rm_dir(basedir)
         fileutil.remove_if_possible(fn) # should survive errors
 
+    def test_write_atomically(self):
+        basedir = "util/FileUtil/test_write_atomically"
+        fileutil.make_dirs(basedir)
+        fn = os.path.join(basedir, "here")
+        fileutil.write_atomically(fn, "one")
+        self.failUnlessEqual(fileutil.read(fn), "one")
+        fileutil.write_atomically(fn, "two", mode="") # non-binary
+        self.failUnlessEqual(fileutil.read(fn), "two")
+
     def test_open_or_create(self):
         basedir = "util/FileUtil/test_open_or_create"
         fileutil.make_dirs(basedir)
@@ -428,7 +437,7 @@ class FileUtil(unittest.TestCase):
         f.write("stuff.")
         f.close()
         f = fileutil.open_or_create(fn)
-        f.seek(0, 2)
+        f.seek(0, os.SEEK_END)
         f.write("more.")
         f.close()
         f = open(fn, "r")
@@ -503,6 +512,25 @@ class FileUtil(unittest.TestCase):
             finally:
                 os.chdir(saved_cwd)
 
+    def test_disk_stats(self):
+        avail = fileutil.get_available_space('.', 2**14)
+        if avail == 0:
+            raise unittest.SkipTest("This test will spuriously fail there is no disk space left.")
+
+        disk = fileutil.get_disk_stats('.', 2**13)
+        self.failUnless(disk['total'] > 0, disk['total'])
+        # we tolerate used==0 for a Travis-CI bug, see #2290
+        self.failUnless(disk['used'] >= 0, disk['used'])
+        self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
+        self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
+        self.failUnless(disk['avail'] > 0, disk['avail'])
+
+    def test_disk_stats_avail_nonnegative(self):
+        # This test will spuriously fail if you have more than 2^128
+        # bytes of available space on your filesystem.
+        disk = fileutil.get_disk_stats('.', 2**128)
+        self.failUnlessEqual(disk['avail'], 0)
+
 class PollMixinTests(unittest.TestCase):
     def setUp(self):
         self.pm = pollmixin.PollMixin()
@@ -632,11 +660,11 @@ class HashUtilTests(unittest.TestCase):
         h2.update("foo")
         self.failUnlessEqual(h1, h2.digest())
 
-    def test_constant_time_compare(self):
-        self.failUnless(hashutil.constant_time_compare("a", "a"))
-        self.failUnless(hashutil.constant_time_compare("ab", "ab"))
-        self.failIf(hashutil.constant_time_compare("a", "b"))
-        self.failIf(hashutil.constant_time_compare("a", "aa"))
+    def test_timing_safe_compare(self):
+        self.failUnless(hashutil.timing_safe_compare("a", "a"))
+        self.failUnless(hashutil.timing_safe_compare("ab", "ab"))
+        self.failIf(hashutil.timing_safe_compare("a", "b"))
+        self.failIf(hashutil.timing_safe_compare("a", "aa"))
 
     def _testknown(self, hashf, expected_a, *args):
         got = hashf(*args)
@@ -706,7 +734,8 @@ class Abbreviate(unittest.TestCase):
                     (1000*1000*1000, "1.00 GB"),
                     (1000*1000*1000*1000, "1.00 TB"),
                     (1000*1000*1000*1000*1000, "1.00 PB"),
-                    (1234567890123456, "1.23 PB"),
+                    (1000*1000*1000*1000*1000*1000, "1.00 EB"),
+                    (1234567890123456789, "1.23 EB"),
                     ]
         for (x, expected) in tests_si:
             got = abbreviate.abbreviate_space(x, SI=True)
@@ -726,7 +755,8 @@ class Abbreviate(unittest.TestCase):
                           (1024*1024*1024*1024, "1.00 TiB"),
                           (1000*1000*1000*1000*1000, "909.49 TiB"),
                           (1024*1024*1024*1024*1024, "1.00 PiB"),
-                          (1234567890123456, "1.10 PiB"),
+                          (1024*1024*1024*1024*1024*1024, "1.00 EiB"),
+                          (1234567890123456789, "1.07 EiB"),
                     ]
         for (x, expected) in tests_base1024:
             got = abbreviate.abbreviate_space(x, SI=False)
@@ -748,8 +778,19 @@ class Abbreviate(unittest.TestCase):
         self.failUnlessEqual(p("10MiB"), 10*1024*1024)
         self.failUnlessEqual(p("5G"), 5*1000*1000*1000)
         self.failUnlessEqual(p("4GiB"), 4*1024*1024*1024)
+        self.failUnlessEqual(p("3TB"), 3*1000*1000*1000*1000)
+        self.failUnlessEqual(p("3TiB"), 3*1024*1024*1024*1024)
+        self.failUnlessEqual(p("6PB"), 6*1000*1000*1000*1000*1000)
+        self.failUnlessEqual(p("6PiB"), 6*1024*1024*1024*1024*1024)
+        self.failUnlessEqual(p("9EB"), 9*1000*1000*1000*1000*1000*1000)
+        self.failUnlessEqual(p("9EiB"), 9*1024*1024*1024*1024*1024*1024)
+
         e = self.failUnlessRaises(ValueError, p, "12 cubits")
-        self.failUnless("12 cubits" in str(e))
+        self.failUnlessIn("12 cubits", str(e))
+        e = self.failUnlessRaises(ValueError, p, "1 BB")
+        self.failUnlessIn("1 BB", str(e))
+        e = self.failUnlessRaises(ValueError, p, "fhtagn")
+        self.failUnlessIn("fhtagn", str(e))
 
 class Limiter(unittest.TestCase):
     timeout = 480 # This takes longer than 240 seconds on Francois's arm box.
@@ -914,6 +955,44 @@ class TimeFormat(unittest.TestCase):
     def test_parse_date(self):
         self.failUnlessEqual(time_format.parse_date("2010-02-21"), 1266710400)
 
+    def test_format_delta(self):
+        time_1 = 1389812723
+        time_5s_delta = 1389812728
+        time_28m7s_delta = 1389814410
+        time_1h_delta = 1389816323
+        time_1d21h46m49s_delta = 1389977532
+        TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
+        time_1_isostr = time.strftime(TIME_FORMAT, time.localtime(time_1))
+
+        self.failUnlessEqual(
+            time_format.format_delta(time_1, time_5s_delta),
+            (time_1_isostr, '5s'))
+        self.failUnlessEqual(
+            time_format.format_delta(time_1, time_28m7s_delta),
+            (time_1_isostr, '28m7s'))
+        self.failUnlessEqual(
+            time_format.format_delta(time_1, time_1h_delta),
+            (time_1_isostr, '1h0m0s'))
+        self.failUnlessEqual(
+            time_format.format_delta(time_1, time_1d21h46m49s_delta),
+            (time_1_isostr, '1d21h46m49s'))
+
+        # time_1 with a decimal fraction will make the delta 1s less
+        time_1decimal = 1389812723.383963
+
+        self.failUnlessEqual(
+            time_format.format_delta(time_1decimal, time_5s_delta),
+            (time_1_isostr, '4s'))
+        self.failUnlessEqual(
+            time_format.format_delta(time_1decimal, time_28m7s_delta),
+            (time_1_isostr, '28m6s'))
+        self.failUnlessEqual(
+            time_format.format_delta(time_1decimal, time_1h_delta),
+            (time_1_isostr, '59m59s'))
+        self.failUnlessEqual(
+            time_format.format_delta(time_1decimal, time_1d21h46m49s_delta),
+            (time_1_isostr, '1d21h46m48s'))
+
 class CacheDir(unittest.TestCase):
     def test_basic(self):
         basedir = "test_util/CacheDir/test_basic"
@@ -1092,16 +1171,13 @@ class DictUtil(unittest.TestCase):
         ds.discard(2, "c")
         self.failIf(2 in ds)
 
-        ds.union(1, ["a", "e"])
-        ds.union(3, ["f"])
-        self.failUnlessEqual(ds[1], set(["a","e"]))
-        self.failUnlessEqual(ds[3], set(["f"]))
+        ds.add(3, "f")
         ds2 = dictutil.DictOfSets()
         ds2.add(3, "f")
         ds2.add(3, "g")
         ds2.add(4, "h")
         ds.update(ds2)
-        self.failUnlessEqual(ds[1], set(["a","e"]))
+        self.failUnlessEqual(ds[1], set(["a"]))
         self.failUnlessEqual(ds[3], set(["f", "g"]))
         self.failUnlessEqual(ds[4], set(["h"]))
 
@@ -1949,7 +2025,7 @@ class StringSpans(unittest.TestCase):
         self.failUnlessEqual(ds.len(), 0)
         self.failUnlessEqual(list(ds._dump()), [])
         self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 0)
-        s = ds.get_spans()
+        s1 = ds.get_spans()
         self.failUnlessEqual(ds.get(0, 4), None)
         self.failUnlessEqual(ds.pop(0, 4), None)
         ds.remove(0, 4)
@@ -1958,8 +2034,8 @@ class StringSpans(unittest.TestCase):
         self.failUnlessEqual(ds.len(), 4)
         self.failUnlessEqual(list(ds._dump()), [2,3,4,5])
         self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
-        s = ds.get_spans()
-        self.failUnless((2,2) in s)
+        s1 = ds.get_spans()
+        self.failUnless((2,2) in s1)
         self.failUnlessEqual(ds.get(0, 4), None)
         self.failUnlessEqual(ds.pop(0, 4), None)
         self.failUnlessEqual(ds.get(4, 4), None)