X-Git-Url: https://git.rkrishnan.org/?a=blobdiff_plain;f=src%2Fallmydata%2Ftest%2Ftest_util.py;h=e59e14aba6e92c4625896e24c6a20f015d38932e;hb=186f6d4a59a879ee93207a48bca15fbdf2741206;hp=2a5ba19978b5eb5e5f1f78735dcf29a92a3cf179;hpb=38668c9e35a9f27123f05d3726e12263248996e6;p=tahoe-lafs%2Ftahoe-lafs.git diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py index 2a5ba199..e59e14ab 100644 --- a/src/allmydata/test/test_util.py +++ b/src/allmydata/test/test_util.py @@ -15,6 +15,8 @@ from allmydata.util import limiter, time_format, pollmixin, cachedir from allmydata.util import statistics, dictutil, pipeline from allmydata.util import log as tahoe_log from allmydata.util.spans import Spans, overlap, DataSpans +from allmydata.test.common_util import ReallyEqualMixin, TimezoneMixin + class Base32(unittest.TestCase): def test_b2a_matches_Pythons(self): @@ -370,7 +372,7 @@ class Asserts(unittest.TestCase): m = self.should_assert(f, False, othermsg="message2") self.failUnlessEqual("postcondition: othermsg: 'message2' ", m) -class FileUtil(unittest.TestCase): +class FileUtil(ReallyEqualMixin, unittest.TestCase): def mkdir(self, basedir, path, mode=0777): fn = os.path.join(basedir, path) fileutil.make_dirs(fn, mode) @@ -429,34 +431,6 @@ class FileUtil(unittest.TestCase): fileutil.write_atomically(fn, "two", mode="") # non-binary self.failUnlessEqual(fileutil.read(fn), "two") - def test_open_or_create(self): - basedir = "util/FileUtil/test_open_or_create" - fileutil.make_dirs(basedir) - fn = os.path.join(basedir, "here") - f = fileutil.open_or_create(fn) - f.write("stuff.") - f.close() - f = fileutil.open_or_create(fn) - f.seek(0, os.SEEK_END) - f.write("more.") - f.close() - f = open(fn, "r") - data = f.read() - f.close() - self.failUnlessEqual(data, "stuff.more.") - - def test_NamedTemporaryDirectory(self): - basedir = "util/FileUtil/test_NamedTemporaryDirectory" - fileutil.make_dirs(basedir) - td = fileutil.NamedTemporaryDirectory(dir=basedir) - name = td.name - self.failUnless(basedir in name) - self.failUnless(basedir in repr(td)) - self.failUnless(os.path.isdir(name)) - del td - # it is conceivable that we need to force gc here, but I'm not sure - self.failIf(os.path.isdir(name)) - def test_rename(self): basedir = "util/FileUtil/test_rename" fileutil.make_dirs(basedir) @@ -488,11 +462,38 @@ class FileUtil(unittest.TestCase): abspath_cwd = fileutil.abspath_expanduser_unicode(u".") self.failUnless(isinstance(saved_cwd, unicode), saved_cwd) self.failUnless(isinstance(abspath_cwd, unicode), abspath_cwd) - self.failUnlessEqual(abspath_cwd, saved_cwd) + if sys.platform == "win32": + self.failUnlessReallyEqual(abspath_cwd, fileutil.to_windows_long_path(saved_cwd)) + else: + self.failUnlessReallyEqual(abspath_cwd, saved_cwd) + + self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\?\\foo"), u"\\\\?\\foo") + self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\.\\foo"), u"\\\\.\\foo") + self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\server\\foo"), u"\\\\?\\UNC\\server\\foo") + self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo"), u"\\\\?\\C:\\foo") + self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo/bar"), u"\\\\?\\C:\\foo\\bar") # adapted from - self.failUnlessIn(u"foo", fileutil.abspath_expanduser_unicode(u"foo")) + foo = fileutil.abspath_expanduser_unicode(u"foo") + self.failUnless(foo.endswith(u"%sfoo" % (os.path.sep,)), foo) + + foobar = fileutil.abspath_expanduser_unicode(u"bar", base=foo) + self.failUnless(foobar.endswith(u"%sfoo%sbar" % (os.path.sep, os.path.sep)), foobar) + + if sys.platform == "win32": + # This is checking that a drive letter is added for a path without one. + baz = fileutil.abspath_expanduser_unicode(u"\\baz") + self.failUnless(baz.startswith(u"\\\\?\\"), baz) + self.failUnlessReallyEqual(baz[5 :], u":\\baz") + + bar = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz) + self.failUnless(bar.startswith(u"\\\\?\\"), bar) + self.failUnlessReallyEqual(bar[5 :], u":\\bar") + # not u":\\baz\\bar", because \bar is absolute on the current drive. + + self.failUnlessReallyEqual(baz[4], bar[4]) # same drive + self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~")) cwds = ['cwd'] @@ -512,6 +513,41 @@ class FileUtil(unittest.TestCase): finally: os.chdir(saved_cwd) + def test_create_long_path(self): + workdir = u"test_create_long_path" + fileutil.make_dirs(workdir) + long_path = fileutil.abspath_expanduser_unicode(os.path.join(workdir, u'x'*255)) + def _cleanup(): + fileutil.remove(long_path) + self.addCleanup(_cleanup) + + fileutil.write(long_path, "test") + self.failUnless(os.path.exists(long_path)) + self.failUnlessEqual(fileutil.read(long_path), "test") + _cleanup() + self.failIf(os.path.exists(long_path)) + + def _test_windows_expanduser(self, userprofile=None, homedrive=None, homepath=None): + def call_windows_getenv(name): + if name == u"USERPROFILE": return userprofile + if name == u"HOMEDRIVE": return homedrive + if name == u"HOMEPATH": return homepath + self.fail("unexpected argument to call_windows_getenv") + self.patch(fileutil, 'windows_getenv', call_windows_getenv) + + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100")) + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~\\foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo")) + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~/foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo")) + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a"), u"a") + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a~"), u"a~") + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a\\~\\foo"), u"a\\~\\foo") + + def test_windows_expanduser_xp(self): + return self._test_windows_expanduser(homedrive=u"C:", homepath=u"\\Documents and Settings\\\u0100") + + def test_windows_expanduser_win7(self): + return self._test_windows_expanduser(userprofile=os.path.join(u"C:", u"\\Documents and Settings\\\u0100")) + def test_disk_stats(self): avail = fileutil.get_available_space('.', 2**14) if avail == 0: @@ -558,7 +594,7 @@ class PollMixinTests(unittest.TestCase): d.addCallbacks(_suc, _err) return d -class DeferredUtilTests(unittest.TestCase): +class DeferredUtilTests(unittest.TestCase, deferredutil.WaitForDelayedCallsMixin): def test_gather_results(self): d1 = defer.Deferred() d2 = defer.Deferred() @@ -598,6 +634,21 @@ class DeferredUtilTests(unittest.TestCase): self.failUnless(isinstance(f, Failure)) self.failUnless(f.check(ValueError)) + def test_wait_for_delayed_calls(self): + """ + This tests that 'wait_for_delayed_calls' does in fact wait for a + delayed call that is active when the test returns. If it didn't, + Trial would report an unclean reactor error for this test. + """ + def _trigger(): + #print "trigger" + pass + reactor.callLater(0.1, _trigger) + + d = defer.succeed(None) + d.addBoth(self.wait_for_delayed_calls) + return d + class HashUtilTests(unittest.TestCase): def test_random_key(self): @@ -867,7 +918,7 @@ class Limiter(unittest.TestCase): d.addCallback(_all_done) return d -class TimeFormat(unittest.TestCase): +class TimeFormat(unittest.TestCase, TimezoneMixin): def test_epoch(self): return self._help_test_epoch() @@ -881,19 +932,12 @@ class TimeFormat(unittest.TestCase): # time_format.iso_utc_time_to_localseconds() breaks if the timezone is # Europe/London. (As soon as this unit test is done then I'll change # that implementation to something that works even in this case...) - origtz = os.environ.get('TZ') - os.environ['TZ'] = "Europe/London" - if hasattr(time, 'tzset'): - time.tzset() - try: - return self._help_test_epoch() - finally: - if origtz is None: - del os.environ['TZ'] - else: - os.environ['TZ'] = origtz - if hasattr(time, 'tzset'): - time.tzset() + + if not self.have_working_tzset(): + raise unittest.SkipTest("This test can't be run on a platform without time.tzset().") + + self.setTimezone("Europe/London") + return self._help_test_epoch() def _help_test_epoch(self): origtzname = time.tzname @@ -955,6 +999,56 @@ class TimeFormat(unittest.TestCase): def test_parse_date(self): self.failUnlessEqual(time_format.parse_date("2010-02-21"), 1266710400) + def test_format_time(self): + self.failUnlessEqual(time_format.format_time(time.gmtime(0)), '1970-01-01 00:00:00') + self.failUnlessEqual(time_format.format_time(time.gmtime(60)), '1970-01-01 00:01:00') + self.failUnlessEqual(time_format.format_time(time.gmtime(60*60)), '1970-01-01 01:00:00') + seconds_per_day = 60*60*24 + leap_years_1970_to_2014_inclusive = ((2012 - 1968) // 4) + self.failUnlessEqual(time_format.format_time(time.gmtime(seconds_per_day*((2015 - 1970)*365+leap_years_1970_to_2014_inclusive))), '2015-01-01 00:00:00') + + def test_format_time_y2038(self): + seconds_per_day = 60*60*24 + leap_years_1970_to_2047_inclusive = ((2044 - 1968) // 4) + try: + self.failUnlessEqual(time_format.format_time(time.gmtime(seconds_per_day*((2048 - 1970)*365+leap_years_1970_to_2047_inclusive))), '2048-01-01 00:00:00') + except unittest.FailTest: + raise unittest.SkipTest("Note: this system cannot handle dates after 2037.") + + def test_format_delta(self): + time_1 = 1389812723 + time_5s_delta = 1389812728 + time_28m7s_delta = 1389814410 + time_1h_delta = 1389816323 + time_1d21h46m49s_delta = 1389977532 + + self.failUnlessEqual( + time_format.format_delta(time_1, time_1), '0s') + + self.failUnlessEqual( + time_format.format_delta(time_1, time_5s_delta), '5s') + self.failUnlessEqual( + time_format.format_delta(time_1, time_28m7s_delta), '28m 7s') + self.failUnlessEqual( + time_format.format_delta(time_1, time_1h_delta), '1h 0m 0s') + self.failUnlessEqual( + time_format.format_delta(time_1, time_1d21h46m49s_delta), '1d 21h 46m 49s') + + self.failUnlessEqual( + time_format.format_delta(time_1d21h46m49s_delta, time_1), '-') + + # time_1 with a decimal fraction will make the delta 1s less + time_1decimal = 1389812723.383963 + + self.failUnlessEqual( + time_format.format_delta(time_1decimal, time_5s_delta), '4s') + self.failUnlessEqual( + time_format.format_delta(time_1decimal, time_28m7s_delta), '28m 6s') + self.failUnlessEqual( + time_format.format_delta(time_1decimal, time_1h_delta), '59m 59s') + self.failUnlessEqual( + time_format.format_delta(time_1decimal, time_1d21h46m49s_delta), '1d 21h 46m 48s') + class CacheDir(unittest.TestCase): def test_basic(self): basedir = "test_util/CacheDir/test_basic"