]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/test/test_util.py
Allow abspath_expanduser_unicode to optionally produce paths without the long-path...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_util.py
index b527771d76f2520e3e5ee7a99debf8707954b705..606dc6c5ab004f304b4ac913ffe93017d79e2a2b 100644 (file)
@@ -15,6 +15,8 @@ from allmydata.util import limiter, time_format, pollmixin, cachedir
 from allmydata.util import statistics, dictutil, pipeline
 from allmydata.util import log as tahoe_log
 from allmydata.util.spans import Spans, overlap, DataSpans
+from allmydata.test.common_util import ReallyEqualMixin
+
 
 class Base32(unittest.TestCase):
     def test_b2a_matches_Pythons(self):
@@ -370,7 +372,7 @@ class Asserts(unittest.TestCase):
         m = self.should_assert(f, False, othermsg="message2")
         self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
 
-class FileUtil(unittest.TestCase):
+class FileUtil(ReallyEqualMixin, unittest.TestCase):
     def mkdir(self, basedir, path, mode=0777):
         fn = os.path.join(basedir, path)
         fileutil.make_dirs(fn, mode)
@@ -429,34 +431,6 @@ class FileUtil(unittest.TestCase):
         fileutil.write_atomically(fn, "two", mode="") # non-binary
         self.failUnlessEqual(fileutil.read(fn), "two")
 
-    def test_open_or_create(self):
-        basedir = "util/FileUtil/test_open_or_create"
-        fileutil.make_dirs(basedir)
-        fn = os.path.join(basedir, "here")
-        f = fileutil.open_or_create(fn)
-        f.write("stuff.")
-        f.close()
-        f = fileutil.open_or_create(fn)
-        f.seek(0, os.SEEK_END)
-        f.write("more.")
-        f.close()
-        f = open(fn, "r")
-        data = f.read()
-        f.close()
-        self.failUnlessEqual(data, "stuff.more.")
-
-    def test_NamedTemporaryDirectory(self):
-        basedir = "util/FileUtil/test_NamedTemporaryDirectory"
-        fileutil.make_dirs(basedir)
-        td = fileutil.NamedTemporaryDirectory(dir=basedir)
-        name = td.name
-        self.failUnless(basedir in name)
-        self.failUnless(basedir in repr(td))
-        self.failUnless(os.path.isdir(name))
-        del td
-        # it is conceivable that we need to force gc here, but I'm not sure
-        self.failIf(os.path.isdir(name))
-
     def test_rename(self):
         basedir = "util/FileUtil/test_rename"
         fileutil.make_dirs(basedir)
@@ -486,14 +460,55 @@ class FileUtil(unittest.TestCase):
 
         saved_cwd = os.path.normpath(os.getcwdu())
         abspath_cwd = fileutil.abspath_expanduser_unicode(u".")
+        abspath_cwd_notlong = fileutil.abspath_expanduser_unicode(u".", long_path=False)
         self.failUnless(isinstance(saved_cwd, unicode), saved_cwd)
         self.failUnless(isinstance(abspath_cwd, unicode), abspath_cwd)
-        self.failUnlessEqual(abspath_cwd, saved_cwd)
+        if sys.platform == "win32":
+            self.failUnlessReallyEqual(abspath_cwd, fileutil.to_windows_long_path(saved_cwd))
+        else:
+            self.failUnlessReallyEqual(abspath_cwd, saved_cwd)
+        self.failUnlessReallyEqual(abspath_cwd_notlong, saved_cwd)
+
+        self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\?\\foo"), u"\\\\?\\foo")
+        self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\.\\foo"), u"\\\\.\\foo")
+        self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\server\\foo"), u"\\\\?\\UNC\\server\\foo")
+        self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo"), u"\\\\?\\C:\\foo")
+        self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo/bar"), u"\\\\?\\C:\\foo\\bar")
 
         # adapted from <http://svn.python.org/view/python/branches/release26-maint/Lib/test/test_posixpath.py?view=markup&pathrev=78279#test_abspath>
 
-        self.failUnlessIn(u"foo", fileutil.abspath_expanduser_unicode(u"foo"))
+        foo = fileutil.abspath_expanduser_unicode(u"foo")
+        self.failUnless(foo.endswith(u"%sfoo" % (os.path.sep,)), foo)
+
+        foobar = fileutil.abspath_expanduser_unicode(u"bar", base=foo)
+        self.failUnless(foobar.endswith(u"%sfoo%sbar" % (os.path.sep, os.path.sep)), foobar)
+
+        if sys.platform == "win32":
+            # This is checking that a drive letter is added for a path without one.
+            baz = fileutil.abspath_expanduser_unicode(u"\\baz")
+            self.failUnless(baz.startswith(u"\\\\?\\"), baz)
+            self.failUnlessReallyEqual(baz[5 :], u":\\baz")
+
+            bar = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz)
+            self.failUnless(bar.startswith(u"\\\\?\\"), bar)
+            self.failUnlessReallyEqual(bar[5 :], u":\\bar")
+            # not u":\\baz\\bar", because \bar is absolute on the current drive.
+
+            self.failUnlessReallyEqual(baz[4], bar[4])  # same drive
+
+            baz_notlong = fileutil.abspath_expanduser_unicode(u"\\baz", long_path=False)
+            self.failIf(baz_notlong.startswith(u"\\\\?\\"), baz_notlong)
+            self.failUnlessReallyEqual(baz_notlong[1 :], u":\\baz")
+
+            bar_notlong = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz_notlong, long_path=False)
+            self.failIf(bar_notlong.startswith(u"\\\\?\\"), bar_notlong)
+            self.failUnlessReallyEqual(bar_notlong[1 :], u":\\bar")
+            # not u":\\baz\\bar", because \bar is absolute on the current drive.
+
+            self.failUnlessReallyEqual(baz_notlong[0], bar_notlong[0])  # same drive
+
         self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~"))
+        self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~", long_path=False))
 
         cwds = ['cwd']
         try:
@@ -509,9 +524,47 @@ class FileUtil(unittest.TestCase):
                 for upath in (u'', u'fuu', u'f\xf9\xf9', u'/fuu', u'U:\\', u'~'):
                     uabspath = fileutil.abspath_expanduser_unicode(upath)
                     self.failUnless(isinstance(uabspath, unicode), uabspath)
+
+                    uabspath_notlong = fileutil.abspath_expanduser_unicode(upath, long_path=False)
+                    self.failUnless(isinstance(uabspath_notlong, unicode), uabspath_notlong)
             finally:
                 os.chdir(saved_cwd)
 
+    def test_create_long_path(self):
+        workdir = u"test_create_long_path"
+        fileutil.make_dirs(workdir)
+        long_path = fileutil.abspath_expanduser_unicode(os.path.join(workdir, u'x'*255))
+        def _cleanup():
+            fileutil.remove(long_path)
+        self.addCleanup(_cleanup)
+
+        fileutil.write(long_path, "test")
+        self.failUnless(os.path.exists(long_path))
+        self.failUnlessEqual(fileutil.read(long_path), "test")
+        _cleanup()
+        self.failIf(os.path.exists(long_path))
+
+    def _test_windows_expanduser(self, userprofile=None, homedrive=None, homepath=None):
+        def call_windows_getenv(name):
+            if name == u"USERPROFILE": return userprofile
+            if name == u"HOMEDRIVE":   return homedrive
+            if name == u"HOMEPATH":    return homepath
+            self.fail("unexpected argument to call_windows_getenv")
+        self.patch(fileutil, 'windows_getenv', call_windows_getenv)
+
+        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100"))
+        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~\\foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo"))
+        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~/foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo"))
+        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a"), u"a")
+        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a~"), u"a~")
+        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a\\~\\foo"), u"a\\~\\foo")
+
+    def test_windows_expanduser_xp(self):
+        return self._test_windows_expanduser(homedrive=u"C:", homepath=u"\\Documents and Settings\\\u0100")
+
+    def test_windows_expanduser_win7(self):
+        return self._test_windows_expanduser(userprofile=os.path.join(u"C:", u"\\Documents and Settings\\\u0100"))
+
     def test_disk_stats(self):
         avail = fileutil.get_available_space('.', 2**14)
         if avail == 0:
@@ -519,7 +572,8 @@ class FileUtil(unittest.TestCase):
 
         disk = fileutil.get_disk_stats('.', 2**13)
         self.failUnless(disk['total'] > 0, disk['total'])
-        self.failUnless(disk['used'] > 0, disk['used'])
+        # we tolerate used==0 for a Travis-CI bug, see #2290
+        self.failUnless(disk['used'] >= 0, disk['used'])
         self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
         self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
         self.failUnless(disk['avail'] > 0, disk['avail'])
@@ -557,7 +611,7 @@ class PollMixinTests(unittest.TestCase):
         d.addCallbacks(_suc, _err)
         return d
 
-class DeferredUtilTests(unittest.TestCase):
+class DeferredUtilTests(unittest.TestCase, deferredutil.WaitForDelayedCallsMixin):
     def test_gather_results(self):
         d1 = defer.Deferred()
         d2 = defer.Deferred()
@@ -597,6 +651,21 @@ class DeferredUtilTests(unittest.TestCase):
         self.failUnless(isinstance(f, Failure))
         self.failUnless(f.check(ValueError))
 
+    def test_wait_for_delayed_calls(self):
+        """
+        This tests that 'wait_for_delayed_calls' does in fact wait for a
+        delayed call that is active when the test returns. If it didn't,
+        Trial would report an unclean reactor error for this test.
+        """
+        def _trigger():
+            #print "trigger"
+            pass
+        reactor.callLater(0.1, _trigger)
+
+        d = defer.succeed(None)
+        d.addBoth(self.wait_for_delayed_calls)
+        return d
+
 class HashUtilTests(unittest.TestCase):
 
     def test_random_key(self):