DeepCheckAndRepairResults
from allmydata.monitor import Monitor
from allmydata.util import hashutil, mathutil, base32, log
-from allmydata.util.stringutils import quote_output
+from allmydata.util.encodingutil import quote_output
from allmydata.util.assertutil import precondition
from allmydata.util.netstring import netstring, split_netstring
from allmydata.util.consumer import download_to_data
import os.path, re, sys, fnmatch
from twisted.python import usage
from allmydata.scripts.common import BaseOptions, get_aliases
-from allmydata.util.stringutils import argv_to_unicode
+from allmydata.util.encodingutil import argv_to_unicode
NODEURL_RE=re.compile("http(s?)://([^:]*)(:([1-9][0-9]*))?")
import os, sys, urllib
import codecs
from twisted.python import usage
-from allmydata.util.stringutils import unicode_to_url, quote_output
+from allmydata.util.encodingutil import unicode_to_url, quote_output
from allmydata.util.assertutil import precondition
class BaseOptions:
import urlparse, httplib
import allmydata # for __full_version__
-from allmydata.util.stringutils import quote_output
+from allmydata.util.encodingutil import quote_output
from allmydata.scripts.common import TahoeError
UnknownAliasError
from allmydata.scripts.common_http import do_http, format_http_error
from allmydata.util import base32
-from allmydata.util.stringutils import quote_output, is_printable_ascii
+from allmydata.util.encodingutil import quote_output, is_printable_ascii
import urllib
import simplejson
from allmydata.scripts.common_http import do_http, check_http_error
from allmydata.scripts.common import get_aliases
from allmydata.util.fileutil import move_into_place
-from allmydata.util.stringutils import unicode_to_output, quote_output
+from allmydata.util.encodingutil import unicode_to_output, quote_output
def add_line_to_aliasfile(aliasfile, alias, cap):
from allmydata.scripts.common_http import do_http, HTTPError, format_http_error
from allmydata.util import time_format
from allmydata.scripts import backupdb
-from allmydata.util.stringutils import listdir_unicode, open_unicode, quote_output, to_str
+from allmydata.util.encodingutil import quote_output, to_str
+from allmydata.util.fileutil import open_expanduser
from allmydata.util.assertutil import precondition
compare_contents = {} # childname -> rocap
try:
- children = listdir_unicode(localpath)
+ children = os.listdir(localpath)
except EnvironmentError:
self.directories_skipped += 1
self.warn("WARNING: permission denied on directory %s" % quote_output(localpath))
children = []
+ except (UnicodeEncodeError, UnicodeDecodeError):
+ self.directories_skipped += 1
+ self.warn("WARNING: could not list directory %s due to an encoding error" % quote_output(localpath))
+ children = []
for child in self.options.filter_listdir(children):
assert isinstance(child, unicode), child
if must_upload:
self.verboseprint("uploading %s.." % quote_output(childpath))
- infileobj = open_unicode(childpath, "rb")
+ infileobj = open_expanduser(childpath, "rb")
url = self.options['node-url'] + "uri"
resp = do_http("PUT", url, infileobj)
if resp.status not in (200, 201):
from allmydata.scripts.common import get_alias, DEFAULT_ALIAS, escape_path, \
UnknownAliasError
from allmydata.scripts.common_http import do_http, format_http_error
-from allmydata.util.stringutils import quote_output, quote_path
+from allmydata.util.encodingutil import quote_output, quote_path
class Checker:
pass
DefaultAliasMarker, TahoeError
from allmydata.scripts.common_http import do_http, HTTPError
from allmydata import uri
-from allmydata.util.stringutils import unicode_to_url, listdir_unicode, open_unicode, \
- abspath_expanduser_unicode, quote_output, to_str
+from allmydata.util.encodingutil import unicode_to_url, quote_output, to_str
+from allmydata.util import fileutil
+from allmydata.util.fileutil import open_expanduser, abspath_expanduser
from allmydata.util.assertutil import precondition
-def _put_local_file(pathname, inf):
- # TODO: create temporary file and move into place?
- # TODO: move this to fileutil.
- outf = open_unicode(pathname, "wb")
- try:
- while True:
- data = inf.read(32768)
- if not data:
- break
- outf.write(data)
- finally:
- outf.close()
-
-
class MissingSourceError(TahoeError):
def __init__(self, name):
TahoeError.__init__(self, "No such file or directory %s" % quote_output(name))
return True
def open(self, caps_only):
- return open_unicode(self.pathname, "rb")
+ return open_expanduser(self.pathname, "rb")
class LocalFileTarget:
self.pathname = pathname
def put_file(self, inf):
- _put_local_file(self.pathname, inf)
+ fileutil.put_file(self.pathname, inf)
class LocalMissingTarget:
self.pathname = pathname
def put_file(self, inf):
- _put_local_file(self.pathname, inf)
+ fileutil.put_file(self.pathname, inf)
class LocalDirectorySource:
if self.children is not None:
return
self.children = {}
- children = listdir_unicode(self.pathname)
+ children = os.listdir(self.pathname)
for i,n in enumerate(children):
self.progressfunc("examining %d of %d" % (i, len(children)))
pn = os.path.join(self.pathname, n)
if self.children is not None:
return
self.children = {}
- children = listdir_unicode(self.pathname)
+ children = os.listdir(self.pathname)
for i,n in enumerate(children):
self.progressfunc("examining %d of %d" % (i, len(children)))
n = unicode(n)
def put_file(self, name, inf):
precondition(isinstance(name, unicode), name)
pathname = os.path.join(self.pathname, name)
- _put_local_file(pathname, inf)
+ fileutil.put_file(pathname, inf)
def set_children(self):
pass
rootcap, path = get_alias(self.aliases, destination_spec, None)
if rootcap == DefaultAliasMarker:
# no alias, so this is a local file
- pathname = abspath_expanduser_unicode(path.decode('utf-8'))
+ pathname = abspath_expanduser(path.decode('utf-8'))
if not os.path.exists(pathname):
t = LocalMissingTarget(pathname)
elif os.path.isdir(pathname):
rootcap, path = get_alias(self.aliases, source_spec, None)
if rootcap == DefaultAliasMarker:
# no alias, so this is a local file
- pathname = abspath_expanduser_unicode(path.decode('utf-8'))
+ pathname = abspath_expanduser(path.decode('utf-8'))
name = os.path.basename(pathname)
if not os.path.exists(pathname):
raise MissingSourceError(source_spec)
from allmydata.scripts.common import get_alias, DEFAULT_ALIAS, escape_path, \
UnknownAliasError
from allmydata.scripts.common_http import do_http, format_http_error
-from allmydata.util.stringutils import open_unicode
+from allmydata.util.fileutil import open_expanduser
def get(options):
nodeurl = options['node-url']
resp = do_http("GET", url)
if resp.status in (200, 201,):
if to_file:
- outf = open_unicode(to_file, "wb")
+ outf = open_expanduser(to_file, "wb")
else:
outf = stdout
while True:
from allmydata.scripts.common import get_alias, DEFAULT_ALIAS, escape_path, \
UnknownAliasError
from allmydata.scripts.common_http import do_http, format_http_error
-from allmydata.util.stringutils import unicode_to_output, quote_output, is_printable_ascii, to_str
+from allmydata.util.encodingutil import unicode_to_output, quote_output, is_printable_ascii, to_str
def list(options):
nodeurl = options['node-url']
from allmydata.scripts.common import get_alias, DEFAULT_ALIAS, escape_path, \
UnknownAliasError
from allmydata.scripts.common_http import do_http, format_http_error
-from allmydata.util.stringutils import quote_output, quote_path
+from allmydata.util.encodingutil import quote_output, quote_path
class FakeTransport:
disconnecting = False
import urllib
from allmydata.scripts.common_http import do_http, check_http_error
from allmydata.scripts.common import get_alias, DEFAULT_ALIAS, UnknownAliasError
-from allmydata.util.stringutils import quote_output
+from allmydata.util.encodingutil import quote_output
def mkdir(options):
nodeurl = options['node-url']
from allmydata.scripts.common import get_alias, DEFAULT_ALIAS, escape_path, \
UnknownAliasError
from allmydata.scripts.common_http import do_http, format_http_error
-from allmydata.util.stringutils import to_str
+from allmydata.util.encodingutil import to_str
# this script is used for both 'mv' and 'ln'
from allmydata.scripts.common_http import do_http, format_http_success, format_http_error
from allmydata.scripts.common import get_alias, DEFAULT_ALIAS, escape_path, \
UnknownAliasError
-from allmydata.util.stringutils import quote_output, open_unicode
+from allmydata.util.encodingutil import quote_output
+from allmydata.util.fileutil import open_expanduser
def put(options):
"""
if mutable:
url += "?mutable=true"
if from_file:
- infileobj = open_unicode(from_file, "rb")
+ infileobj = open_expanduser(from_file, "rb")
else:
# do_http() can't use stdin directly: for one thing, we need a
# Content-Length field. So we currently must copy it.
from twisted.trial import unittest
from allmydata.util import fileutil
-from allmydata.util.stringutils import listdir_unicode, get_filesystem_encoding, unicode_platform
+from allmydata.util.encodingutil import get_filesystem_encoding, unicode_platform
from allmydata.util.assertutil import precondition
from allmydata.scripts import backupdb
self.failUnless(bdb)
self.writeto(u"f\u00f6\u00f6.txt", "foo.txt")
- files = [fn for fn in listdir_unicode(unicode(basedir)) if fn.endswith(".txt")]
+ files = [fn for fn in os.listdir(unicode(basedir)) if fn.endswith(".txt")]
self.failUnlessEqual(len(files), 1)
foo_fn = os.path.join(basedir, files[0])
#print foo_fn, type(foo_fn)
from twisted.python import usage
from allmydata.util.assertutil import precondition
-from allmydata.util.stringutils import listdir_unicode, open_unicode, unicode_platform, \
- quote_output, get_output_encoding, get_argv_encoding, get_filesystem_encoding, \
+from allmydata.util.encodingutil import unicode_platform, quote_output, \
+ get_output_encoding, get_argv_encoding, get_filesystem_encoding, \
unicode_to_output, to_str
timeout = 480 # deep_check takes 360s on Zandr's linksys box, others take > 240s
fileutil.make_dirs(basedir)
for name in filenames:
- open_unicode(os.path.join(unicode(basedir), name), "wb").close()
+ open(os.path.join(unicode(basedir), name), "wb").close()
- for file in listdir_unicode(unicode(basedir)):
+ for file in os.listdir(unicode(basedir)):
self.failUnlessIn(normalize(file), filenames)
rel_fn = os.path.join(unicode(self.basedir), u"Ã trier.txt")
# we make the file small enough to fit in a LIT file, for speed
DATA = "short file"
- f = open_unicode(rel_fn, "wb")
- try:
- f.write(DATA)
- finally:
- f.close()
+ fileutil.write(rel_fn, DATA)
d = self.do_cli("create-alias", "tahoe")
self.set_up_grid()
DATA1 = "unicode file content"
- f = open_unicode(fn1, "wb")
- try:
- f.write(DATA1)
- finally:
- f.close()
+ fileutil.write(fn1, DATA1)
fn2 = os.path.join(self.basedir, "Metallica")
DATA2 = "non-unicode file content"
--- /dev/null
+
+lumiere_nfc = u"lumi\u00E8re"
+Artonwall_nfc = u"\u00C4rtonwall.mp3"
+Artonwall_nfd = u"A\u0308rtonwall.mp3"
+
+TEST_FILENAMES = (
+ Artonwall_nfc,
+ u'test_file',
+ u'Blah blah.txt',
+)
+
+# The following main helps to generate a test class for other operating
+# systems.
+
+if __name__ == "__main__":
+ import sys
+ import platform
+
+ if len(sys.argv) != 2:
+ print "Usage: %s lumi<e-grave>re" % sys.argv[0]
+ sys.exit(1)
+
+ print
+ print "class MyWeirdOS(StringUtils, unittest.TestCase):"
+ print " uname = '%s'" % ' '.join(platform.uname())
+ if sys.platform != "win32":
+ print " argv = %s" % repr(sys.argv[1])
+ print " platform = '%s'" % sys.platform
+ print " filesystem_encoding = '%s'" % sys.getfilesystemencoding()
+ print " output_encoding = '%s'" % sys.stdout.encoding
+ print " argv_encoding = '%s'" % (sys.platform == "win32" and 'ascii' or sys.stdout.encoding)
+ print
+
+ sys.exit(0)
+
+from twisted.trial import unittest
+from mock import patch
+import sys
+
+from allmydata.test.common_util import ReallyEqualMixin
+from allmydata.util.encodingutil import argv_to_unicode, unicode_to_url, \
+ unicode_to_output, unicode_platform, get_output_encoding, _reload
+
+from twisted.python import usage
+
+class StringUtilsErrors(ReallyEqualMixin, unittest.TestCase):
+ def tearDown(self):
+ _reload()
+
+ @patch('sys.stdout')
+ def test_get_output_encoding(self, mock_stdout):
+ mock_stdout.encoding = 'UTF-8'
+ _reload()
+ self.failUnlessReallyEqual(get_output_encoding(), 'utf-8')
+
+ mock_stdout.encoding = 'cp65001'
+ _reload()
+ self.failUnlessReallyEqual(get_output_encoding(), 'utf-8')
+
+ mock_stdout.encoding = 'koi8-r'
+ _reload()
+ self.failUnlessReallyEqual(get_output_encoding(), 'koi8-r')
+
+ mock_stdout.encoding = 'nonexistent_encoding'
+ self.failUnlessRaises(AssertionError, _reload)
+
+ # TODO: mock_stdout.encoding = None
+
+ @patch('sys.stdout')
+ def test_argv_to_unicode(self, mock):
+ mock.encoding = 'utf-8'
+ _reload()
+
+ self.failUnlessRaises(usage.UsageError,
+ argv_to_unicode,
+ lumiere_nfc.encode('latin1'))
+
+ @patch('sys.stdout')
+ def test_unicode_to_output(self, mock):
+ # Encoding koi8-r cannot represent e-grave
+ mock.encoding = 'koi8-r'
+ _reload()
+ self.failUnlessRaises(UnicodeEncodeError, unicode_to_output, lumiere_nfc)
+
+
+class StringUtils(ReallyEqualMixin):
+ def setUp(self):
+ # Mock sys.platform because unicode_platform() uses it
+ self.original_platform = sys.platform
+ sys.platform = self.platform
+
+ def tearDown(self):
+ sys.platform = self.original_platform
+ _reload()
+
+ @patch('sys.stdout')
+ def test_argv_to_unicode(self, mock):
+ if 'argv' not in dir(self):
+ return
+
+ mock.encoding = self.output_encoding
+ argu = lumiere_nfc
+ argv = self.argv
+ _reload()
+ self.failUnlessReallyEqual(argv_to_unicode(argv), argu)
+
+ def test_unicode_to_url(self):
+ self.failUnless(unicode_to_url(lumiere_nfc), "lumi\xc3\xa8re")
+
+ @patch('sys.stdout')
+ def test_unicode_to_output(self, mock):
+ if 'output' not in dir(self):
+ return
+
+ mock.encoding = self.output_encoding
+ _reload()
+ self.failUnlessReallyEqual(unicode_to_output(lumiere_nfc), self.output)
+
+ def test_unicode_platform(self):
+ matrix = {
+ 'linux2': False,
+ 'openbsd4': False,
+ 'win32': True,
+ 'darwin': True,
+ }
+
+ _reload()
+ self.failUnlessReallyEqual(unicode_platform(), matrix[self.platform])
+
+
+class UbuntuKarmicUTF8(StringUtils, unittest.TestCase):
+ uname = 'Linux korn 2.6.31-14-generic #48-Ubuntu SMP Fri Oct 16 14:05:01 UTC 2009 x86_64'
+ output = 'lumi\xc3\xa8re'
+ argv = 'lumi\xc3\xa8re'
+ platform = 'linux2'
+ filesystem_encoding = 'UTF-8'
+ output_encoding = 'UTF-8'
+ argv_encoding = 'UTF-8'
+
+class UbuntuKarmicLatin1(StringUtils, unittest.TestCase):
+ uname = 'Linux korn 2.6.31-14-generic #48-Ubuntu SMP Fri Oct 16 14:05:01 UTC 2009 x86_64'
+ output = 'lumi\xe8re'
+ argv = 'lumi\xe8re'
+ platform = 'linux2'
+ filesystem_encoding = 'ISO-8859-1'
+ output_encoding = 'ISO-8859-1'
+ argv_encoding = 'ISO-8859-1'
+
+class WindowsXP(StringUtils, unittest.TestCase):
+ uname = 'Windows XP 5.1.2600 x86 x86 Family 15 Model 75 Step ping 2, AuthenticAMD'
+ output = 'lumi\x8are'
+ platform = 'win32'
+ filesystem_encoding = 'mbcs'
+ output_encoding = 'cp850'
+ argv_encoding = 'ascii'
+
+class WindowsXP_UTF8(StringUtils, unittest.TestCase):
+ uname = 'Windows XP 5.1.2600 x86 x86 Family 15 Model 75 Step ping 2, AuthenticAMD'
+ output = 'lumi\xc3\xa8re'
+ platform = 'win32'
+ filesystem_encoding = 'mbcs'
+ output_encoding = 'cp65001'
+ argv_encoding = 'ascii'
+
+class WindowsVista(StringUtils, unittest.TestCase):
+ uname = 'Windows Vista 6.0.6000 x86 x86 Family 6 Model 15 Stepping 11, GenuineIntel'
+ output = 'lumi\x8are'
+ platform = 'win32'
+ filesystem_encoding = 'mbcs'
+ output_encoding = 'cp850'
+ argv_encoding = 'ascii'
+
+class MacOSXLeopard(StringUtils, unittest.TestCase):
+ uname = 'Darwin g5.local 9.8.0 Darwin Kernel Version 9.8.0: Wed Jul 15 16:57:01 PDT 2009; root:xnu-1228.15.4~1/RELEASE_PPC Power Macintosh powerpc'
+ output = 'lumi\xc3\xa8re'
+ argv = 'lumi\xc3\xa8re'
+ platform = 'darwin'
+ filesystem_encoding = 'utf-8'
+ output_encoding = 'UTF-8'
+ argv_encoding = 'UTF-8'
+
+class MacOSXLeopard7bit(StringUtils, unittest.TestCase):
+ uname = 'Darwin g5.local 9.8.0 Darwin Kernel Version 9.8.0: Wed Jul 15 16:57:01 PDT 2009; root:xnu-1228.15.4~1/RELEASE_PPC Power Macintosh powerpc'
+ platform = 'darwin'
+ filesystem_encoding = 'utf-8'
+ output_encoding = 'US-ASCII'
+ argv_encoding = 'US-ASCII'
+
+class OpenBSD(StringUtils, unittest.TestCase):
+ uname = 'OpenBSD 4.1 GENERIC#187 i386 Intel(R) Celeron(R) CPU 2.80GHz ("GenuineIntel" 686-class)'
+ platform = 'openbsd4'
+ filesystem_encoding = '646'
+ output_encoding = '646'
+ argv_encoding = '646'
+ # Oops, I cannot write filenames containing non-ascii characters
+++ /dev/null
-
-lumiere_nfc = u"lumi\u00E8re"
-Artonwall_nfc = u"\u00C4rtonwall.mp3"
-Artonwall_nfd = u"A\u0308rtonwall.mp3"
-
-TEST_FILENAMES = (
- Artonwall_nfc,
- u'test_file',
- u'Blah blah.txt',
-)
-
-# The following main helps to generate a test class for other operating
-# systems.
-
-if __name__ == "__main__":
- import sys, os
- import tempfile
- import shutil
- import platform
-
- if len(sys.argv) != 2:
- print "Usage: %s lumi<e-grave>re" % sys.argv[0]
- sys.exit(1)
-
- print
- print "class MyWeirdOS(StringUtils, unittest.TestCase):"
- print " uname = '%s'" % ' '.join(platform.uname())
- if sys.platform != "win32":
- print " argv = %s" % repr(sys.argv[1])
- print " platform = '%s'" % sys.platform
- print " filesystem_encoding = '%s'" % sys.getfilesystemencoding()
- print " output_encoding = '%s'" % sys.stdout.encoding
- print " argv_encoding = '%s'" % (sys.platform == "win32" and 'ascii' or sys.stdout.encoding)
-
- try:
- tmpdir = tempfile.mkdtemp()
- for fname in TEST_FILENAMES:
- open(os.path.join(tmpdir, fname), 'w').close()
-
- # Use Unicode API under Windows or MacOS X
- if sys.platform in ('win32', 'darwin'):
- dirlist = os.listdir(unicode(tmpdir))
- else:
- dirlist = os.listdir(tmpdir)
-
- print " dirlist = %s" % repr(dirlist)
- except:
- print " # Oops, I cannot write filenames containing non-ascii characters"
- print
-
- shutil.rmtree(tmpdir)
- sys.exit(0)
-
-from twisted.trial import unittest
-from mock import patch
-import sys
-
-from allmydata.test.common_util import ReallyEqualMixin
-from allmydata.util.stringutils import argv_to_unicode, unicode_to_url, \
- unicode_to_output, unicode_platform, listdir_unicode, open_unicode, \
- FilenameEncodingError, get_output_encoding, _reload
-from allmydata.dirnode import normalize
-
-from twisted.python import usage
-
-class StringUtilsErrors(ReallyEqualMixin, unittest.TestCase):
- def tearDown(self):
- _reload()
-
- @patch('sys.stdout')
- def test_get_output_encoding(self, mock_stdout):
- mock_stdout.encoding = 'UTF-8'
- _reload()
- self.failUnlessReallyEqual(get_output_encoding(), 'utf-8')
-
- mock_stdout.encoding = 'cp65001'
- _reload()
- self.failUnlessReallyEqual(get_output_encoding(), 'utf-8')
-
- mock_stdout.encoding = 'koi8-r'
- _reload()
- self.failUnlessReallyEqual(get_output_encoding(), 'koi8-r')
-
- mock_stdout.encoding = 'nonexistent_encoding'
- self.failUnlessRaises(AssertionError, _reload)
-
- # TODO: mock_stdout.encoding = None
-
- @patch('sys.stdout')
- def test_argv_to_unicode(self, mock):
- mock.encoding = 'utf-8'
- _reload()
-
- self.failUnlessRaises(usage.UsageError,
- argv_to_unicode,
- lumiere_nfc.encode('latin1'))
-
- @patch('sys.stdout')
- def test_unicode_to_output(self, mock):
- # Encoding koi8-r cannot represent e-grave
- mock.encoding = 'koi8-r'
- _reload()
- self.failUnlessRaises(UnicodeEncodeError, unicode_to_output, lumiere_nfc)
-
- @patch('os.listdir')
- def test_no_unicode_normalization(self, mock):
- # Pretend to run on a Unicode platform.
- # We normalized to NFC in 1.7beta, but we now don't.
- orig_platform = sys.platform
- try:
- sys.platform = 'darwin'
- mock.return_value = [Artonwall_nfd]
- _reload()
- self.failUnlessReallyEqual(listdir_unicode(u'/dummy'), [Artonwall_nfd])
- finally:
- sys.platform = orig_platform
-
-# The following tests applies only to platforms which don't store filenames as
-# Unicode entities on the filesystem.
-class StringUtilsNonUnicodePlatform(unittest.TestCase):
- def setUp(self):
- # Mock sys.platform because unicode_platform() uses it
- self.original_platform = sys.platform
- sys.platform = 'linux'
-
- def tearDown(self):
- sys.platform = self.original_platform
- _reload()
-
- @patch('sys.getfilesystemencoding')
- @patch('os.listdir')
- def test_listdir_unicode(self, mock_listdir, mock_getfilesystemencoding):
- # What happens if latin1-encoded filenames are encountered on an UTF-8
- # filesystem?
- mock_listdir.return_value = [
- lumiere_nfc.encode('utf-8'),
- lumiere_nfc.encode('latin1')]
-
- mock_getfilesystemencoding.return_value = 'utf-8'
- _reload()
- self.failUnlessRaises(FilenameEncodingError,
- listdir_unicode,
- u'/dummy')
-
- # We're trying to list a directory whose name cannot be represented in
- # the filesystem encoding. This should fail.
- mock_getfilesystemencoding.return_value = 'ascii'
- _reload()
- self.failUnlessRaises(FilenameEncodingError,
- listdir_unicode,
- u'/' + lumiere_nfc)
-
- @patch('sys.getfilesystemencoding')
- def test_open_unicode(self, mock):
- mock.return_value = 'ascii'
- _reload()
- self.failUnlessRaises(FilenameEncodingError,
- open_unicode,
- lumiere_nfc, 'rb')
-
-class StringUtils(ReallyEqualMixin):
- def setUp(self):
- # Mock sys.platform because unicode_platform() uses it
- self.original_platform = sys.platform
- sys.platform = self.platform
-
- def tearDown(self):
- sys.platform = self.original_platform
- _reload()
-
- @patch('sys.stdout')
- def test_argv_to_unicode(self, mock):
- if 'argv' not in dir(self):
- return
-
- mock.encoding = self.output_encoding
- argu = lumiere_nfc
- argv = self.argv
- _reload()
- self.failUnlessReallyEqual(argv_to_unicode(argv), argu)
-
- def test_unicode_to_url(self):
- self.failUnless(unicode_to_url(lumiere_nfc), "lumi\xc3\xa8re")
-
- @patch('sys.stdout')
- def test_unicode_to_output(self, mock):
- if 'output' not in dir(self):
- return
-
- mock.encoding = self.output_encoding
- _reload()
- self.failUnlessReallyEqual(unicode_to_output(lumiere_nfc), self.output)
-
- def test_unicode_platform(self):
- matrix = {
- 'linux2': False,
- 'openbsd4': False,
- 'win32': True,
- 'darwin': True,
- }
-
- _reload()
- self.failUnlessReallyEqual(unicode_platform(), matrix[self.platform])
-
- @patch('sys.getfilesystemencoding')
- @patch('os.listdir')
- def test_listdir_unicode(self, mock_listdir, mock_getfilesystemencoding):
- if 'dirlist' not in dir(self):
- return
-
- try:
- u"test".encode(self.filesystem_encoding)
- except (LookupError, AttributeError):
- raise unittest.SkipTest("This platform does not support the '%s' filesystem encoding "
- "that we are testing for the benefit of a different platform."
- % (self.filesystem_encoding,))
-
- mock_listdir.return_value = self.dirlist
- mock_getfilesystemencoding.return_value = self.filesystem_encoding
-
- _reload()
- filenames = listdir_unicode(u'/dummy')
-
- self.failUnlessEqual(set([normalize(fname) for fname in filenames]),
- set(TEST_FILENAMES))
-
- @patch('sys.getfilesystemencoding')
- @patch('__builtin__.open')
- def test_open_unicode(self, mock_open, mock_getfilesystemencoding):
- mock_getfilesystemencoding.return_value = self.filesystem_encoding
- fn = u'/dummy_directory/" + lumiere_nfc + ".txt'
-
- try:
- u"test".encode(self.filesystem_encoding)
- except (LookupError, AttributeError):
- raise unittest.SkipTest("This platform does not support the '%s' filesystem encoding "
- "that we are testing for the benefit of a different platform."
- % (self.filesystem_encoding,))
-
- _reload()
- try:
- open_unicode(fn, 'rb')
- except FilenameEncodingError:
- return
-
- # Pass Unicode string to open() on Unicode platforms
- if unicode_platform():
- mock_open.assert_called_with(fn, 'rb')
-
- # Pass correctly encoded bytestrings to open() on non-Unicode platforms
- else:
- fn_bytestring = fn.encode(self.filesystem_encoding)
- mock_open.assert_called_with(fn_bytestring, 'rb')
-
-
-class UbuntuKarmicUTF8(StringUtils, unittest.TestCase):
- uname = 'Linux korn 2.6.31-14-generic #48-Ubuntu SMP Fri Oct 16 14:05:01 UTC 2009 x86_64'
- output = 'lumi\xc3\xa8re'
- argv = 'lumi\xc3\xa8re'
- platform = 'linux2'
- filesystem_encoding = 'UTF-8'
- output_encoding = 'UTF-8'
- argv_encoding = 'UTF-8'
- dirlist = ['test_file', '\xc3\x84rtonwall.mp3', 'Blah blah.txt']
-
-class UbuntuKarmicLatin1(StringUtils, unittest.TestCase):
- uname = 'Linux korn 2.6.31-14-generic #48-Ubuntu SMP Fri Oct 16 14:05:01 UTC 2009 x86_64'
- output = 'lumi\xe8re'
- argv = 'lumi\xe8re'
- platform = 'linux2'
- filesystem_encoding = 'ISO-8859-1'
- output_encoding = 'ISO-8859-1'
- argv_encoding = 'ISO-8859-1'
- dirlist = ['test_file', 'Blah blah.txt', '\xc4rtonwall.mp3']
-
-class WindowsXP(StringUtils, unittest.TestCase):
- uname = 'Windows XP 5.1.2600 x86 x86 Family 15 Model 75 Step ping 2, AuthenticAMD'
- output = 'lumi\x8are'
- platform = 'win32'
- filesystem_encoding = 'mbcs'
- output_encoding = 'cp850'
- argv_encoding = 'ascii'
- dirlist = [u'Blah blah.txt', u'test_file', u'\xc4rtonwall.mp3']
-
-class WindowsXP_UTF8(StringUtils, unittest.TestCase):
- uname = 'Windows XP 5.1.2600 x86 x86 Family 15 Model 75 Step ping 2, AuthenticAMD'
- output = 'lumi\xc3\xa8re'
- platform = 'win32'
- filesystem_encoding = 'mbcs'
- output_encoding = 'cp65001'
- argv_encoding = 'ascii'
- dirlist = [u'Blah blah.txt', u'test_file', u'\xc4rtonwall.mp3']
-
-class WindowsVista(StringUtils, unittest.TestCase):
- uname = 'Windows Vista 6.0.6000 x86 x86 Family 6 Model 15 Stepping 11, GenuineIntel'
- output = 'lumi\x8are'
- platform = 'win32'
- filesystem_encoding = 'mbcs'
- output_encoding = 'cp850'
- argv_encoding = 'ascii'
- dirlist = [u'Blah blah.txt', u'test_file', u'\xc4rtonwall.mp3']
-
-class MacOSXLeopard(StringUtils, unittest.TestCase):
- uname = 'Darwin g5.local 9.8.0 Darwin Kernel Version 9.8.0: Wed Jul 15 16:57:01 PDT 2009; root:xnu-1228.15.4~1/RELEASE_PPC Power Macintosh powerpc'
- output = 'lumi\xc3\xa8re'
- argv = 'lumi\xc3\xa8re'
- platform = 'darwin'
- filesystem_encoding = 'utf-8'
- output_encoding = 'UTF-8'
- argv_encoding = 'UTF-8'
- dirlist = [u'A\u0308rtonwall.mp3', u'Blah blah.txt', u'test_file']
-
-class MacOSXLeopard7bit(StringUtils, unittest.TestCase):
- uname = 'Darwin g5.local 9.8.0 Darwin Kernel Version 9.8.0: Wed Jul 15 16:57:01 PDT 2009; root:xnu-1228.15.4~1/RELEASE_PPC Power Macintosh powerpc'
- platform = 'darwin'
- filesystem_encoding = 'utf-8'
- output_encoding = 'US-ASCII'
- argv_encoding = 'US-ASCII'
- dirlist = [u'A\u0308rtonwall.mp3', u'Blah blah.txt', u'test_file']
-
-class OpenBSD(StringUtils, unittest.TestCase):
- uname = 'OpenBSD 4.1 GENERIC#187 i386 Intel(R) Celeron(R) CPU 2.80GHz ("GenuineIntel" 686-class)'
- platform = 'openbsd4'
- filesystem_encoding = '646'
- output_encoding = '646'
- argv_encoding = '646'
- # Oops, I cannot write filenames containing non-ascii characters
--- /dev/null
+"""
+Functions used to convert inputs from whatever encoding used in the system to
+unicode and back.
+"""
+
+import sys
+import re
+from allmydata.util.assertutil import precondition
+from twisted.python import usage
+import locale
+from allmydata.util import log
+
+
+def _canonical_encoding(encoding):
+ if encoding is None:
+ log.msg("Warning: falling back to UTF-8 encoding.", level=log.WEIRD)
+ encoding = 'utf-8'
+ encoding = encoding.lower()
+ if encoding == "cp65001":
+ encoding = 'utf-8'
+ elif encoding == "us-ascii" or encoding == "646" or encoding == "ansi_x3.4-1968":
+ encoding = 'ascii'
+
+ # sometimes Python returns an encoding name that it doesn't support for conversion
+ # fail early if this happens
+ try:
+ u"test".encode(encoding)
+ except (LookupError, AttributeError):
+ raise AssertionError("The character encoding '%s' is not supported for conversion." % (encoding,))
+
+ return encoding
+
+filesystem_encoding = None
+output_encoding = None
+argv_encoding = None
+is_unicode_platform = False
+
+def _reload():
+ global filesystem_encoding, output_encoding, argv_encoding, is_unicode_platform
+
+ filesystem_encoding = _canonical_encoding(sys.getfilesystemencoding())
+
+ outenc = None
+ if hasattr(sys.stdout, 'encoding'):
+ outenc = sys.stdout.encoding
+ if outenc is None:
+ try:
+ outenc = locale.getpreferredencoding()
+ except Exception:
+ pass # work around <http://bugs.python.org/issue1443504>
+ output_encoding = _canonical_encoding(outenc)
+
+ if sys.platform == 'win32':
+ # Unicode arguments are not supported on Windows yet; see #565 and #1074.
+ argv_encoding = 'ascii'
+ else:
+ argv_encoding = output_encoding
+ is_unicode_platform = sys.platform in ["win32", "darwin"]
+
+_reload()
+
+
+def get_filesystem_encoding():
+ """
+ Returns expected encoding for local filenames.
+ """
+ return filesystem_encoding
+
+def get_output_encoding():
+ """
+ Returns expected encoding for writing to stdout or stderr.
+ """
+ return output_encoding
+
+def get_argv_encoding():
+ """
+ Returns expected encoding for command-line arguments.
+ """
+ return argv_encoding
+
+def argv_to_unicode(s):
+ """
+ Decode given argv element to unicode. If this fails, raise a UsageError.
+ """
+ precondition(isinstance(s, str), s)
+
+ try:
+ return unicode(s, argv_encoding)
+ except UnicodeDecodeError:
+ raise usage.UsageError("Argument %s cannot be decoded as %s." %
+ (quote_output(s), argv_encoding))
+
+def unicode_to_url(s):
+ """
+ Encode an unicode object used in an URL.
+ """
+ # According to RFC 2718, non-ascii characters in URLs must be UTF-8 encoded.
+
+ # FIXME
+ return to_str(s)
+ #precondition(isinstance(s, unicode), s)
+ #return s.encode('utf-8')
+
+def to_str(s):
+ if s is None or isinstance(s, str):
+ return s
+ return s.encode('utf-8')
+
+def to_argv(s):
+ if isinstance(s, str):
+ return s
+ return s.encode(argv_encoding)
+
+PRINTABLE_ASCII = re.compile(r'^[ -~\n\r]*$', re.DOTALL)
+PRINTABLE_8BIT = re.compile(r'^[ -&(-~\n\r\x80-\xFF]*$', re.DOTALL)
+
+def is_printable_ascii(s):
+ return PRINTABLE_ASCII.search(s) is not None
+
+def unicode_to_output(s):
+ """
+ Encode an unicode object for representation on stdout or stderr.
+ """
+ precondition(isinstance(s, unicode), s)
+
+ try:
+ out = s.encode(output_encoding)
+ except (UnicodeEncodeError, UnicodeDecodeError):
+ raise UnicodeEncodeError(output_encoding, s, 0, 0,
+ "A string could not be encoded as %s for output to the terminal:\n%r" %
+ (output_encoding, repr(s)))
+
+ if PRINTABLE_8BIT.search(out) is None:
+ raise UnicodeEncodeError(output_encoding, s, 0, 0,
+ "A string encoded as %s for output to the terminal contained unsafe bytes:\n%r" %
+ (output_encoding, repr(s)))
+ return out
+
+def quote_output(s, quotemarks=True, encoding=None):
+ """
+ Encode either a Unicode string or a UTF-8-encoded bytestring for representation
+ on stdout or stderr, tolerating errors. If 'quotemarks' is True, the string is
+ always surrounded by single quotes; otherwise, it is quoted only if necessary to
+ avoid ambiguity or control bytes in the output.
+ """
+ precondition(isinstance(s, (str, unicode)), s)
+
+ if isinstance(s, str):
+ try:
+ s = s.decode('utf-8')
+ except UnicodeDecodeError:
+ return 'b' + repr(s)
+
+ try:
+ out = s.encode(encoding or output_encoding)
+ except (UnicodeEncodeError, UnicodeDecodeError):
+ return repr(s)
+
+ if PRINTABLE_8BIT.search(out) is None:
+ return repr(out)
+
+ if quotemarks:
+ return "'" + out.replace("\\", "\\\\").replace("'", "\'") + "'"
+ else:
+ return out
+
+def quote_path(path, quotemarks=True):
+ return quote_output("/".join(map(to_str, path)), quotemarks=quotemarks)
+
+
+def unicode_platform():
+ """
+ Does the current platform handle Unicode filenames natively?
+ """
+ return is_unicode_platform
return rf.read()
finally:
rf.close()
+
+def put_file(pathname, inf):
+ # TODO: create temporary file and move into place?
+ outf = open_expanduser(pathname, "wb")
+ try:
+ while True:
+ data = inf.read(32768)
+ if not data:
+ break
+ outf.write(data)
+ finally:
+ outf.close()
+
+def open_expanduser(path, mode):
+ assert isinstance(path, unicode), path
+ return open(os.path.expanduser(path), mode)
+
+def abspath_expanduser(path):
+ assert isinstance(path, unicode), path
+ return os.path.abspath(os.path.expanduser(path))
+++ /dev/null
-"""
-Functions used to convert inputs from whatever encoding used in the system to
-unicode and back.
-"""
-
-import sys
-import os
-import re
-from allmydata.util.assertutil import precondition
-from twisted.python import usage
-import locale
-from allmydata.util import log
-
-
-def _canonical_encoding(encoding):
- if encoding is None:
- log.msg("Warning: falling back to UTF-8 encoding.", level=log.WEIRD)
- encoding = 'utf-8'
- encoding = encoding.lower()
- if encoding == "cp65001":
- encoding = 'utf-8'
- elif encoding == "us-ascii" or encoding == "646" or encoding == "ansi_x3.4-1968":
- encoding = 'ascii'
-
- # sometimes Python returns an encoding name that it doesn't support for conversion
- # fail early if this happens
- try:
- u"test".encode(encoding)
- except (LookupError, AttributeError):
- raise AssertionError("The character encoding '%s' is not supported for conversion." % (encoding,))
-
- return encoding
-
-filesystem_encoding = None
-output_encoding = None
-argv_encoding = None
-is_unicode_platform = False
-
-def _reload():
- global filesystem_encoding, output_encoding, argv_encoding, is_unicode_platform
-
- filesystem_encoding = _canonical_encoding(sys.getfilesystemencoding())
-
- outenc = None
- if hasattr(sys.stdout, 'encoding'):
- outenc = sys.stdout.encoding
- if outenc is None:
- try:
- outenc = locale.getpreferredencoding()
- except Exception:
- pass # work around <http://bugs.python.org/issue1443504>
- output_encoding = _canonical_encoding(outenc)
-
- if sys.platform == 'win32':
- # Unicode arguments are not supported on Windows yet; see #565 and #1074.
- argv_encoding = 'ascii'
- else:
- argv_encoding = output_encoding
- is_unicode_platform = sys.platform in ["win32", "darwin"]
-
-_reload()
-
-
-def get_filesystem_encoding():
- """
- Returns expected encoding for local filenames.
- """
- return filesystem_encoding
-
-def get_output_encoding():
- """
- Returns expected encoding for writing to stdout or stderr.
- """
- return output_encoding
-
-def get_argv_encoding():
- """
- Returns expected encoding for command-line arguments.
- """
- return argv_encoding
-
-def argv_to_unicode(s):
- """
- Decode given argv element to unicode. If this fails, raise a UsageError.
- """
- precondition(isinstance(s, str), s)
-
- try:
- return unicode(s, argv_encoding)
- except UnicodeDecodeError:
- raise usage.UsageError("Argument %s cannot be decoded as %s." %
- (quote_output(s), argv_encoding))
-
-def unicode_to_url(s):
- """
- Encode an unicode object used in an URL.
- """
- # According to RFC 2718, non-ascii characters in URLs must be UTF-8 encoded.
-
- # FIXME
- return to_str(s)
- #precondition(isinstance(s, unicode), s)
- #return s.encode('utf-8')
-
-def to_str(s):
- if s is None or isinstance(s, str):
- return s
- return s.encode('utf-8')
-
-def to_argv(s):
- if isinstance(s, str):
- return s
- return s.encode(argv_encoding)
-
-PRINTABLE_ASCII = re.compile(r'^[ -~\n\r]*$', re.DOTALL)
-PRINTABLE_8BIT = re.compile(r'^[ -&(-~\n\r\x80-\xFF]*$', re.DOTALL)
-
-def is_printable_ascii(s):
- return PRINTABLE_ASCII.search(s) is not None
-
-def unicode_to_output(s):
- """
- Encode an unicode object for representation on stdout or stderr.
- """
- precondition(isinstance(s, unicode), s)
-
- try:
- out = s.encode(output_encoding)
- except (UnicodeEncodeError, UnicodeDecodeError):
- raise UnicodeEncodeError(output_encoding, s, 0, 0,
- "A string could not be encoded as %s for output to the terminal:\n%r" %
- (output_encoding, repr(s)))
-
- if PRINTABLE_8BIT.search(out) is None:
- raise UnicodeEncodeError(output_encoding, s, 0, 0,
- "A string encoded as %s for output to the terminal contained unsafe bytes:\n%r" %
- (output_encoding, repr(s)))
- return out
-
-def quote_output(s, quotemarks=True, encoding=None):
- """
- Encode either a Unicode string or a UTF-8-encoded bytestring for representation
- on stdout or stderr, tolerating errors. If 'quotemarks' is True, the string is
- always surrounded by single quotes; otherwise, it is quoted only if necessary to
- avoid ambiguity or control bytes in the output.
- """
- precondition(isinstance(s, (str, unicode)), s)
-
- if isinstance(s, str):
- try:
- s = s.decode('utf-8')
- except UnicodeDecodeError:
- return 'b' + repr(s)
-
- try:
- out = s.encode(encoding or output_encoding)
- except (UnicodeEncodeError, UnicodeDecodeError):
- return repr(s)
-
- if PRINTABLE_8BIT.search(out) is None:
- return repr(out)
-
- if quotemarks:
- return "'" + out.replace("\\", "\\\\").replace("'", "\'") + "'"
- else:
- return out
-
-def quote_path(path, quotemarks=True):
- return quote_output("/".join(map(to_str, path)), quotemarks=quotemarks)
-
-
-def unicode_platform():
- """
- Does the current platform handle Unicode filenames natively?
- """
- return is_unicode_platform
-
-class FilenameEncodingError(Exception):
- """
- Filename cannot be encoded using the current encoding of your filesystem
- (%s). Please configure your locale correctly or rename this file.
- """
- pass
-
-def listdir_unicode_fallback(path):
- """
- This function emulates a fallback Unicode API similar to one available
- under Windows or MacOS X.
-
- If badly encoded filenames are encountered, an exception is raised.
- """
- precondition(isinstance(path, unicode), path)
-
- try:
- byte_path = path.encode(filesystem_encoding)
- except (UnicodeEncodeError, UnicodeDecodeError):
- raise FilenameEncodingError(path)
-
- try:
- return [unicode(fn, filesystem_encoding) for fn in os.listdir(byte_path)]
- except UnicodeDecodeError:
- raise FilenameEncodingError(fn)
-
-def listdir_unicode(path):
- """
- Wrapper around listdir() which provides safe access to the convenient
- Unicode API even under platforms that don't provide one natively.
- """
- precondition(isinstance(path, unicode), path)
-
- # On Windows and MacOS X, the Unicode API is used
- # On other platforms (ie. Unix systems), the byte-level API is used
-
- if is_unicode_platform:
- return os.listdir(path)
- else:
- return listdir_unicode_fallback(path)
-
-def open_unicode(path, mode):
- """
- Wrapper around open() which provides safe access to the convenient Unicode
- API even under Unix.
- """
- precondition(isinstance(path, unicode), path)
-
- if is_unicode_platform:
- return open(os.path.expanduser(path), mode)
- else:
- try:
- return open(os.path.expanduser(path.encode(filesystem_encoding)), mode)
- except UnicodeEncodeError:
- raise FilenameEncodingError(path)
-
-def abspath_expanduser_unicode(path):
- precondition(isinstance(path, unicode), path)
-
- if is_unicode_platform:
- return os.path.abspath(os.path.expanduser(path))
- else:
- try:
- pathstr = path.encode(filesystem_encoding)
- return os.path.abspath(os.path.expanduser(pathstr)).decode(filesystem_encoding)
- except (UnicodeEncodeError, UnicodeDecodeError):
- raise FilenameEncodingError(path)
MustBeReadonlyError, MustNotBeUnknownRWError
from allmydata.mutable.common import UnrecoverableFileError
from allmydata.util import abbreviate
-from allmydata.util.stringutils import to_str
+from allmydata.util.encodingutil import to_str
class IOpHandleTable(Interface):
pass