noisy = True
use_foolscap_logging = True
-from allmydata.util.log import NOISY, OPERATIONAL, SCARY
+from allmydata.util.log import NOISY, OPERATIONAL, \
+ msg as _msg, err as _err, PrefixingLogMixin as _PrefixingLogMixin
if use_foolscap_logging:
- from allmydata.util.log import msg as logmsg, err as logerr, PrefixingLogMixin
-else:
+ (logmsg, logerr, PrefixingLogMixin) = (_msg, _err, _PrefixingLogMixin)
+else: # pragma: no cover
def logmsg(s, level=None):
print s
def logerr(s, level=None):
def log(self, s, level=None):
print s
-if noisy:
- def eventually_callback(d):
- s = traceback.format_stack()
- def _cb(res):
- try:
- if noisy: logmsg("CALLBACK %r" % (d,), level=NOISY)
- d.callback(res)
- except: # pragma: no cover
- logerr("Failed to callback %r with %r\n"
- "Original stack:\n!%s" %
- (d, res, '!'.join(s)), level=SCARY)
- traceback.print_exc()
- raise
- return lambda res: eventually(_cb, res)
-
- def eventually_errback(d):
- s = traceback.format_stack()
- def _eb(err):
- try:
- if noisy: logmsg("ERRBACK %r %r" % (d, err), level=NOISY)
- d.errback(err)
- except: # pragma: no cover
- logerr("Failed to errback %r with %r\n"
- "Original stack:\n!%s" %
- (d, err, '!'.join(s)), level=SCARY)
- traceback.print_exc()
- raise
- return lambda err: eventually(_eb, err)
-else:
- def eventually_callback(d):
- return lambda res: eventually(d.callback, res)
-
- def eventually_errback(d):
- return lambda err: eventually(d.errback, err)
+
+def eventually_callback(d):
+ return lambda res: eventually(d.callback, res)
+
+def eventually_errback(d):
+ return lambda err: eventually(d.errback, err)
+
+
+def _utf8(x):
+ if isinstance(x, unicode):
+ return x.encode('utf-8')
+ if isinstance(x, str):
+ return x
+ return repr(x)
def _convert_error(res, request):
# original raiser of SFTPError has responsibility to ensure anonymity
raise err
if err.check(NoSuchChildError):
- childname = err.value.args[0].encode('utf-8')
+ childname = _utf8(err.value.args[0])
raise SFTPError(FX_NO_SUCH_FILE, childname)
if err.check(NotWriteableError):
- msg = err.value.args[0].encode('utf-8')
+ msg = _utf8(err.value.args[0])
raise SFTPError(FX_PERMISSION_DENIED, msg)
if err.check(ExistingChildError):
# Versions of SFTP after v3 (which is what twisted.conch implements)
# FX_FAILURE. The gvfs SFTP backend, for example, depends on this
# to translate the error to the equivalent of POSIX EEXIST, which is
# necessary for some picky programs (such as gedit).
- msg = err.value.args[0].encode('utf-8')
+ msg = _utf8(err.value.args[0])
raise SFTPError(FX_FAILURE, msg)
if err.check(NotImplementedError):
- raise SFTPError(FX_OP_UNSUPPORTED, str(err.value))
+ raise SFTPError(FX_OP_UNSUPPORTED, _utf8(err.value))
if err.check(EOFError):
raise SFTPError(FX_EOF, "end of file reached")
if err.check(defer.FirstError):
_convert_error(err.value.subFailure, request)
# We assume that the error message is not anonymity-sensitive.
- raise SFTPError(FX_FAILURE, str(err.value))
+ raise SFTPError(FX_FAILURE, _utf8(err.value))
def _repr_flags(flags):
return "|".join([f for f in
- [(flags & FXF_READ) and "FXF_READ" or None,
- (flags & FXF_WRITE) and "FXF_WRITE" or None,
+ [(flags & FXF_READ) and "FXF_READ" or None,
+ (flags & FXF_WRITE) and "FXF_WRITE" or None,
(flags & FXF_APPEND) and "FXF_APPEND" or None,
- (flags & FXF_CREAT) and "FXF_CREAT" or None,
- (flags & FXF_TRUNC) and "FXF_TRUNC" or None,
- (flags & FXF_EXCL) and "FXF_EXCL" or None,
+ (flags & FXF_CREAT) and "FXF_CREAT" or None,
+ (flags & FXF_TRUNC) and "FXF_TRUNC" or None,
+ (flags & FXF_EXCL) and "FXF_EXCL" or None,
]
if f])
# We don't know how many links there really are to this object.
st_nlink = 1
- # From <http://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
+ # Based on <http://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
# We can't call the version in Twisted because we might have a version earlier than
# <http://twistedmatrix.com/trac/changeset/25412> (released in Twisted 8.2).
mode = st_mode
perms = array.array('c', '-'*10)
ft = stat.S_IFMT(mode)
- if stat.S_ISDIR(ft): perms[0] = 'd'
+ if stat.S_ISDIR(ft): perms[0] = 'd'
elif stat.S_ISCHR(ft): perms[0] = 'c'
elif stat.S_ISBLK(ft): perms[0] = 'b'
elif stat.S_ISREG(ft): perms[0] = '-'
def _is_readonly(parent_readonly, child):
- """Whether child should be treated as having read-only permissions when listed
- in parent."""
+ """Whether child should be listed as having read-only permissions in parent."""
if child.is_unknown():
return True
-import re, struct
+import re, struct, traceback
from stat import S_IFREG, S_IFDIR
from twisted.trial import unittest
from twisted.conch.ssh import filetransfer as sftp
from allmydata.frontends import sftpd
-import traceback
-
-"""
-import sys
-def trace_exceptions(frame, event, arg):
- if event != 'exception':
- return
- co = frame.f_code
- func_name = co.co_name
- line_no = frame.f_lineno
- filename = co.co_filename
- exc_type, exc_value, exc_traceback = arg
- print 'Tracing exception: %r %r on line %r of %r in %r' % \
- (exc_type.__name__, exc_value, line_no, func_name, filename)
-
-def trace_calls(frame, event, arg):
- if event != 'call':
- return
- return trace_exceptions
-
-sys.settrace(trace_calls)
-"""
-
-timeout = 240
-
from allmydata.interfaces import IDirectoryNode, ExistingChildError, NoSuchChildError
from allmydata.mutable.common import NotWriteableError
from allmydata.test.no_network import GridTestMixin
from allmydata.test.common import ShouldFailMixin
+timeout = 240
+
class Handler(GridTestMixin, ShouldFailMixin, unittest.TestCase):
"""This is a no-network unit test of the SFTPHandler class."""