-import os, tempfile, heapq, binascii, traceback, array, stat, struct
+import heapq, traceback, array, stat, struct
+from types import NoneType
from stat import S_IFREG, S_IFDIR
from time import time, strftime, localtime
from twisted.conch.ssh import factory, keys, session
from twisted.conch.ssh.filetransfer import FileTransferServer, SFTPError, \
FX_NO_SUCH_FILE, FX_OP_UNSUPPORTED, FX_PERMISSION_DENIED, FX_EOF, \
- FX_BAD_MESSAGE, FX_FAILURE
+ FX_BAD_MESSAGE, FX_FAILURE, FX_OK
from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_APPEND, \
FXF_CREAT, FXF_TRUNC, FXF_EXCL
from twisted.conch.interfaces import ISFTPServer, ISFTPFile, IConchUser, ISession
from allmydata.util.consumer import download_to_data
from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
- NoSuchChildError
+ NoSuchChildError, ChildOfWrongTypeError
from allmydata.mutable.common import NotWriteableError
+from allmydata.mutable.publish import MutableFileHandle
from allmydata.immutable.upload import FileHandle
-
-from pycryptopp.cipher.aes import AES
-
-# twisted.conch.ssh.filetransfer generates this warning, but not when it is imported,
-# only on an error.
-import warnings
-warnings.filterwarnings("ignore", category=DeprecationWarning,
- message="BaseException.message has been deprecated as of Python 2.6",
- module=".*filetransfer", append=True)
+from allmydata.dirnode import update_metadata
+from allmydata.util.fileutil import EncryptedTemporaryFile
noisy = True
use_foolscap_logging = True
-from allmydata.util.log import NOISY, OPERATIONAL, \
+from allmydata.util.log import NOISY, OPERATIONAL, WEIRD, \
msg as _msg, err as _err, PrefixingLogMixin as _PrefixingLogMixin
if use_foolscap_logging:
def logerr(s, level=None):
print s
class PrefixingLogMixin:
- def __init__(self, facility=None):
- pass
+ def __init__(self, facility=None, prefix=''):
+ self.prefix = prefix
def log(self, s, level=None):
- print s
+ print "%r %s" % (self.prefix, s)
def eventually_callback(d):
def _convert_error(res, request):
+ """If res is not a Failure, return it, otherwise reraise the appropriate
+ SFTPError."""
+
if not isinstance(res, Failure):
logged_res = res
if isinstance(res, str): logged_res = "<data of length %r>" % (len(res),)
logmsg("RAISE %r %r" % (request, err.value), level=OPERATIONAL)
try:
if noisy: logmsg(traceback.format_exc(err.value), level=NOISY)
- except: # pragma: no cover
+ except Exception: # pragma: no cover
pass
# The message argument to SFTPError must not reveal information that
- # might compromise anonymity.
+ # might compromise anonymity, if we are running over an anonymous network.
if err.check(SFTPError):
# original raiser of SFTPError has responsibility to ensure anonymity
if err.check(NoSuchChildError):
childname = _utf8(err.value.args[0])
raise SFTPError(FX_NO_SUCH_FILE, childname)
- if err.check(NotWriteableError):
+ if err.check(NotWriteableError) or err.check(ChildOfWrongTypeError):
msg = _utf8(err.value.args[0])
raise SFTPError(FX_PERMISSION_DENIED, msg)
if err.check(ExistingChildError):
st_gid = "tahoe"
st_mtime = attrs.get("mtime", 0)
st_mode = attrs["permissions"]
- # TODO: check that clients are okay with this being a "?".
- # (They should be because the longname is intended for human
- # consumption.)
- st_size = attrs.get("size", "?")
+
+ # Some clients won't tolerate '?' in the size field (#1337).
+ st_size = attrs.get("size", 0)
+
# We don't know how many links there really are to this object.
st_nlink = 1
mode = st_mode
perms = array.array('c', '-'*10)
ft = stat.S_IFMT(mode)
- if stat.S_ISDIR(ft): perms[0] = 'd'
- elif stat.S_ISCHR(ft): perms[0] = 'c'
- elif stat.S_ISBLK(ft): perms[0] = 'b'
- elif stat.S_ISREG(ft): perms[0] = '-'
- elif stat.S_ISFIFO(ft): perms[0] = 'f'
- elif stat.S_ISLNK(ft): perms[0] = 'l'
- elif stat.S_ISSOCK(ft): perms[0] = 's'
+ if stat.S_ISDIR(ft): perms[0] = 'd'
+ elif stat.S_ISREG(ft): perms[0] = '-'
else: perms[0] = '?'
# user
if mode&stat.S_IRUSR: perms[1] = 'r'
return l
-def _is_readonly(parent_readonly, child):
+def _no_write(parent_readonly, child, metadata=None):
"""Whether child should be listed as having read-only permissions in parent."""
if child.is_unknown():
return True
elif child.is_mutable():
return child.is_readonly()
+ elif parent_readonly or IDirectoryNode.providedBy(child):
+ return True
else:
- return parent_readonly
+ return metadata is not None and metadata.get('no-write', False)
def _populate_attrs(childnode, metadata, size=None):
# bits, otherwise the client may refuse to open a directory.
# Also, sshfs run as a non-root user requires files and directories
# to be world-readable/writeable.
+ # It is important that we never set the executable bits on files.
#
# Directories and unknown nodes have no size, and SFTP doesn't
# require us to make one up.
perms = S_IFREG | 0666
if metadata:
- assert 'readonly' in metadata, metadata
- if metadata['readonly']:
+ if metadata.get('no-write', False):
perms &= S_IFDIR | S_IFREG | 0555 # clear 'w' bits
- # see webapi.txt for what these times mean
+ # See webapi.txt for what these times mean.
+ # We would prefer to omit atime, but SFTP version 3 can only
+ # accept mtime if atime is also set.
if 'linkmotime' in metadata.get('tahoe', {}):
- attrs['mtime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
+ attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
elif 'mtime' in metadata:
- # We would prefer to omit atime, but SFTP version 3 can only
- # accept mtime if atime is also set.
- attrs['mtime'] = _to_sftp_time(metadata['mtime'])
- attrs['atime'] = attrs['mtime']
+ attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['mtime'])
if 'linkcrtime' in metadata.get('tahoe', {}):
attrs['createtime'] = _to_sftp_time(metadata['tahoe']['linkcrtime'])
- if 'ctime' in metadata:
- attrs['ctime'] = _to_sftp_time(metadata['ctime'])
-
attrs['permissions'] = perms
# twisted.conch.ssh.filetransfer only implements SFTP version 3,
return attrs
-class EncryptedTemporaryFile(PrefixingLogMixin):
- # not implemented: next, readline, readlines, xreadlines, writelines
+def _attrs_to_metadata(attrs):
+ metadata = {}
- def __init__(self):
- PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
- self.file = tempfile.TemporaryFile()
- self.key = os.urandom(16) # AES-128
-
- def _crypt(self, offset, data):
- # FIXME: use random-access AES (pycryptopp ticket #18)
- offset_big = offset // 16
- offset_small = offset % 16
- iv = binascii.unhexlify("%032x" % offset_big)
- cipher = AES(self.key, iv=iv)
- cipher.process("\x00"*offset_small)
- return cipher.process(data)
+ for key in attrs:
+ if key == "mtime" or key == "ctime" or key == "createtime":
+ metadata[key] = long(attrs[key])
+ elif key.startswith("ext_"):
+ metadata[key] = str(attrs[key])
- def close(self):
- self.file.close()
+ perms = attrs.get('permissions', stat.S_IWUSR)
+ if not (perms & stat.S_IWUSR):
+ metadata['no-write'] = True
- def flush(self):
- self.file.flush()
+ return metadata
- def seek(self, offset, whence=os.SEEK_SET):
- if noisy: self.log(".seek(%r, %r)" % (offset, whence), level=NOISY)
- self.file.seek(offset, whence)
- def tell(self):
- offset = self.file.tell()
- if noisy: self.log(".tell() = %r" % (offset,), level=NOISY)
- return offset
+def _direntry_for(filenode_or_parent, childname, filenode=None):
+ assert isinstance(childname, (unicode, NoneType)), childname
- def read(self, size=-1):
- if noisy: self.log(".read(%r)" % (size,), level=NOISY)
- index = self.file.tell()
- ciphertext = self.file.read(size)
- plaintext = self._crypt(index, ciphertext)
- return plaintext
+ if childname is None:
+ filenode_or_parent = filenode
- def write(self, plaintext):
- if noisy: self.log(".write(<data of length %r>)" % (len(plaintext),), level=NOISY)
- index = self.file.tell()
- ciphertext = self._crypt(index, plaintext)
- self.file.write(ciphertext)
+ if filenode_or_parent:
+ rw_uri = filenode_or_parent.get_write_uri()
+ if rw_uri and childname:
+ return rw_uri + "/" + childname.encode('utf-8')
+ else:
+ return rw_uri
- def truncate(self, newsize):
- if noisy: self.log(".truncate(%r)" % (newsize,), level=NOISY)
- self.file.truncate(newsize)
+ return None
class OverwriteableFileConsumer(PrefixingLogMixin):
The temporary file reflects the contents of the file that I represent, except that:
- regions that have neither been downloaded nor overwritten, if present,
- contain zeroes.
+ contain garbage.
- the temporary file may be shorter than the represented file (it is never longer).
The latter's current size is stored in self.current_size.
This abstraction is mostly independent of SFTP. Consider moving it, if it is found
useful for other frontends."""
- def __init__(self, check_abort, download_size, tempfile_maker):
+ def __init__(self, download_size, tempfile_maker):
PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
- if noisy: self.log(".__init__(%r, %r, %r)" % (check_abort, download_size, tempfile_maker), level=NOISY)
- self.check_abort = check_abort
+ if noisy: self.log(".__init__(%r, %r)" % (download_size, tempfile_maker), level=NOISY)
self.download_size = download_size
self.current_size = download_size
self.f = tempfile_maker()
(size, self.current_size, self.downloaded), level=NOISY)
if size < self.current_size or size < self.downloaded:
self.f.truncate(size)
+ if size > self.current_size:
+ self.overwrite(self.current_size, "\x00" * (size - self.current_size))
self.current_size = size
+
+ # make the invariant self.download_size <= self.current_size be true again
if size < self.download_size:
self.download_size = size
+
if self.downloaded >= self.download_size:
self.finish()
def registerProducer(self, p, streaming):
if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
+ if self.producer is not None:
+ raise RuntimeError("producer is already registered")
+
self.producer = p
if streaming:
# call resumeProducing once to start things off
if noisy: self.log(".write(<data of length %r>)" % (len(data),), level=NOISY)
if self.is_closed:
return
- if self.check_abort():
- self.close()
- return
if self.downloaded >= self.download_size:
return
return
if noisy: self.log("MILESTONE %r %r" % (next, d), level=NOISY)
heapq.heappop(self.milestones)
- eventually_callback(d)(None)
+ eventually(d.callback, None)
if milestone >= self.download_size:
self.finish()
def overwrite(self, offset, data):
if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
- if offset > self.download_size and offset > self.current_size:
+ if offset > self.current_size:
# Normally writing at an offset beyond the current end-of-file
# would leave a hole that appears filled with zeroes. However, an
# EncryptedTemporaryFile doesn't behave like that (if there is a
# the gap between the current EOF and the offset.
self.f.seek(self.current_size)
- self.f.write("\x00" * (offset - self.current_size))
+ self.f.write("\x00" * (offset - self.current_size))
+ start = self.current_size
else:
self.f.seek(offset)
+ start = offset
+
self.f.write(data)
end = offset + len(data)
self.current_size = max(self.current_size, end)
if end > self.downloaded:
- heapq.heappush(self.overwrites, (offset, end))
+ heapq.heappush(self.overwrites, (start, end))
def read(self, offset, length):
"""When the data has been read, callback the Deferred that we return with this data.
The caller must perform no more overwrites until the Deferred has fired."""
if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
+
+ # Note that the overwrite method is synchronous. When a write request is processed
+ # (e.g. a writeChunk request on the async queue of GeneralSFTPFile), overwrite will
+ # be called and will update self.current_size if necessary before returning. Therefore,
+ # self.current_size will be up-to-date for a subsequent call to this read method, and
+ # so it is correct to do the check for a read past the end-of-file here.
if offset >= self.current_size:
def _eof(): raise EOFError("read past end of file")
return defer.execute(_eof)
return self.done
def finish(self):
+ """Called by the producer when it has finished producing, or when we have
+ received enough bytes, or as a result of a close. Defined by IFinishableConsumer."""
+
while len(self.milestones) > 0:
(next, d) = self.milestones[0]
if noisy: self.log("MILESTONE FINISH %r %r" % (next, d), level=NOISY)
# The callback means that the milestone has been reached if
# it is ever going to be. Note that the file may have been
# truncated to before the milestone.
- eventually_callback(d)(None)
-
- # FIXME: causes spurious failures
- #self.unregisterProducer()
+ eventually(d.callback, None)
def close(self):
- self.is_closed = True
+ if not self.is_closed:
+ self.is_closed = True
+ try:
+ self.f.close()
+ except Exception, e:
+ self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
self.finish()
- self.f.close()
def unregisterProducer(self):
- if self.producer:
- self.producer.stopProducing()
- self.producer = None
+ pass
SIZE_THRESHOLD = 1000
implements(ISFTPFile)
"""I represent a file handle to a particular file on an SFTP connection.
I am used only for short immutable files opened in read-only mode.
- The file contents are downloaded to memory when I am created."""
+ When I am created, the file contents start to be downloaded to memory.
+ self.async is used to delay read requests until the download has finished."""
- def __init__(self, filenode, metadata):
- PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
- if noisy: self.log(".__init__(%r, %r)" % (filenode, metadata), level=NOISY)
+ def __init__(self, userpath, filenode, metadata):
+ PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
+ if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY)
- assert IFileNode.providedBy(filenode), filenode
+ assert isinstance(userpath, str) and IFileNode.providedBy(filenode), (userpath, filenode)
self.filenode = filenode
self.metadata = metadata
self.async = download_to_data(filenode)
d = defer.Deferred()
def _read(data):
- if noisy: self.log("_read(%r) in readChunk(%r, %r)" % (data, offset, length), level=NOISY)
+ if noisy: self.log("_read(<data of length %r>) in readChunk(%r, %r)" % (len(data), offset, length), level=NOISY)
# "In response to this request, the server will read as many bytes as it
# can from the file (up to 'len'), and return them in a SSH_FXP_DATA
# i.e. we respond with an EOF error iff offset is already at EOF.
if offset >= len(data):
- eventually_errback(d)(SFTPError(FX_EOF, "read at or past end of file"))
+ eventually(d.errback, SFTPError(FX_EOF, "read at or past end of file"))
else:
- eventually_callback(d)(data[offset:min(offset+length, len(data))])
+ eventually(d.callback, data[offset:offset+length]) # truncated if offset+length > len(data)
return data
self.async.addCallbacks(_read, eventually_errback(d))
d.addBoth(_convert_error, request)
storing the file contents. In order to allow write requests to be satisfied
immediately, there is effectively a FIFO queue between requests made to this
file handle, and requests to my OverwriteableFileConsumer. This queue is
- implemented by the callback chain of self.async."""
+ implemented by the callback chain of self.async.
- def __init__(self, close_notify, check_abort, flags, convergence, parent=None, childname=None, filenode=None, metadata=None):
- PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
- if noisy: self.log(".__init__(%r, %r, %r, <convergence censored>, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
- (close_notify, check_abort, flags, parent, childname, filenode, metadata), level=NOISY)
+ When first constructed, I am in an 'unopened' state that causes most
+ operations to be delayed until 'open' is called."""
- self.close_notify = close_notify
- self.check_abort = check_abort
+ def __init__(self, userpath, flags, close_notify, convergence):
+ PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
+ if noisy: self.log(".__init__(%r, %r = %r, %r, <convergence censored>)" %
+ (userpath, flags, _repr_flags(flags), close_notify), level=NOISY)
+
+ assert isinstance(userpath, str), userpath
+ self.userpath = userpath
self.flags = flags
+ self.close_notify = close_notify
self.convergence = convergence
- self.parent = parent
- self.childname = childname
- self.filenode = filenode
- self.metadata = metadata
- self.async = defer.succeed(None)
+ self.async = defer.Deferred()
# Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
self.closed = False
-
+ self.abandoned = False
+ self.parent = None
+ self.childname = None
+ self.filenode = None
+ self.metadata = None
+
# self.consumer should only be relied on in callbacks for self.async, since it might
# not be set before then.
self.consumer = None
+
+ def open(self, parent=None, childname=None, filenode=None, metadata=None):
+ self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" %
+ (parent, childname, filenode, metadata), level=OPERATIONAL)
+
+ assert isinstance(childname, (unicode, NoneType)), childname
+ # If the file has been renamed, the new (parent, childname) takes precedence.
+ if self.parent is None:
+ self.parent = parent
+ if self.childname is None:
+ self.childname = childname
+ self.filenode = filenode
+ self.metadata = metadata
+
+ assert not self.closed, self
tempfile_maker = EncryptedTemporaryFile
- if (flags & FXF_TRUNC) or not filenode:
+ if (self.flags & FXF_TRUNC) or not filenode:
# We're either truncating or creating the file, so we don't need the old contents.
- self.consumer = OverwriteableFileConsumer(self.check_abort, 0, tempfile_maker)
+ self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
self.consumer.finish()
else:
assert IFileNode.providedBy(filenode), filenode
- # TODO: use download interface described in #993 when implemented.
- if filenode.is_mutable():
- self.async.addCallback(lambda ign: filenode.download_best_version())
- def _downloaded(data):
- self.consumer = OverwriteableFileConsumer(self.check_abort, len(data), tempfile_maker)
- self.consumer.write(data)
- self.consumer.finish()
- return None
- self.async.addCallback(_downloaded)
- else:
- download_size = filenode.get_size()
- assert download_size is not None, "download_size is None"
- self.consumer = OverwriteableFileConsumer(self.check_abort, download_size, tempfile_maker)
- def _read(ign):
- if noisy: self.log("_read immutable", level=NOISY)
- filenode.read(self.consumer, 0, None)
- self.async.addCallback(_read)
+ self.async.addCallback(lambda ignored: filenode.get_best_readable_version())
+
+ def _read(version):
+ if noisy: self.log("_read", level=NOISY)
+ download_size = version.get_size()
+ assert download_size is not None
- if noisy: self.log("__init__ done", level=NOISY)
+ self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
- def rename(self, new_parent, new_childname):
- self.log(".rename(%r, %r)" % (new_parent, new_childname), level=OPERATIONAL)
+ version.read(self.consumer, 0, None)
+ self.async.addCallback(_read)
+ eventually(self.async.callback, None)
+
+ if noisy: self.log("open done", level=NOISY)
+ return self
+
+ def get_userpath(self):
+ return self.userpath
+
+ def get_direntry(self):
+ return _direntry_for(self.parent, self.childname)
+
+ def rename(self, new_userpath, new_parent, new_childname):
+ self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL)
+
+ assert isinstance(new_userpath, str) and isinstance(new_childname, unicode), (new_userpath, new_childname)
+ self.userpath = new_userpath
self.parent = new_parent
self.childname = new_childname
+ def abandon(self):
+ self.log(".abandon()", level=OPERATIONAL)
+
+ self.abandoned = True
+
+ def sync(self, ign=None):
+ # The ign argument allows some_file.sync to be used as a callback.
+ self.log(".sync()", level=OPERATIONAL)
+
+ d = defer.Deferred()
+ self.async.addBoth(eventually_callback(d))
+ def _done(res):
+ if noisy: self.log("_done(%r) in .sync()" % (res,), level=NOISY)
+ return res
+ d.addBoth(_done)
+ return d
+
def readChunk(self, offset, length):
request = ".readChunk(%r, %r)" % (offset, length)
self.log(request, level=OPERATIONAL)
def _read(ign):
if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
d2 = self.consumer.read(offset, length)
- d2.addErrback(_convert_error, request)
d2.addCallbacks(eventually_callback(d), eventually_errback(d))
# It is correct to drop d2 here.
return None
# Note that we return without waiting for the write to occur. Reads and
# close wait for prior writes, and will fail if any prior operation failed.
- # This is ok because SFTP makes no guarantee that the request completes
- # before the write. In fact it explicitly allows write errors to be delayed
- # until close:
+ # This is ok because SFTP makes no guarantee that the write completes
+ # before the request does. In fact it explicitly allows write errors to be
+ # delayed until close:
# "One should note that on some server platforms even a close can fail.
# This can happen e.g. if the server operating system caches writes,
# and an error occurs while flushing cached writes during the close."
self.closed = True
if not (self.flags & (FXF_WRITE | FXF_CREAT)):
- return defer.execute(self.consumer.close)
+ def _readonly_close():
+ if self.consumer:
+ self.consumer.close()
+ return defer.execute(_readonly_close)
+
+ # We must capture the abandoned, parent, and childname variables synchronously
+ # at the close call. This is needed by the correctness arguments in the comments
+ # for _abandon_any_heisenfiles and _rename_heisenfiles.
+ # Note that the file must have been opened before it can be closed.
+ abandoned = self.abandoned
+ parent = self.parent
+ childname = self.childname
+
+ # has_changed is set when writeChunk is called, not when the write occurs, so
+ # it is correct to optimize out the commit if it is False at the close call.
+ has_changed = self.has_changed
+
+ def _committed(res):
+ if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
+
+ self.consumer.close()
+
+ # We must close_notify before re-firing self.async.
+ if self.close_notify:
+ self.close_notify(self.userpath, self.parent, self.childname, self)
+ return res
def _close(ign):
- d2 = defer.succeed(None)
- if self.has_changed:
- d2.addCallback(lambda ign: self.consumer.when_done())
- if self.filenode and self.filenode.is_mutable():
- d2.addCallback(lambda ign: self.consumer.get_current_size())
- d2.addCallback(lambda size: self.consumer.read(0, size))
- d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
- else:
- def _add_file(ign):
- self.log("_add_file childname=%r" % (self.childname,), level=OPERATIONAL)
- u = FileHandle(self.consumer.get_file(), self.convergence)
- return self.parent.add_file(self.childname, u)
- d2.addCallback(_add_file)
+ d2 = self.consumer.when_done()
+ if self.filenode and self.filenode.is_mutable():
+ self.log("update mutable file %r childname=%r metadata=%r" % (self.filenode, childname, self.metadata), level=OPERATIONAL)
+ if self.metadata.get('no-write', False) and not self.filenode.is_readonly():
+ assert parent and childname, (parent, childname, self.metadata)
+ d2.addCallback(lambda ign: parent.set_metadata_for(childname, self.metadata))
+
+ d2.addCallback(lambda ign: self.filenode.overwrite(MutableFileHandle(self.consumer.get_file())))
+ else:
+ def _add_file(ign):
+ self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
+ u = FileHandle(self.consumer.get_file(), self.convergence)
+ return parent.add_file(childname, u, metadata=self.metadata)
+ d2.addCallback(_add_file)
- d2.addCallback(lambda ign: self.consumer.close())
+ d2.addBoth(_committed)
return d2
- self.async.addCallback(_close)
d = defer.Deferred()
- self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
- def _closed(res):
- self.close_notify(self.parent, self.childname, self)
- return res
- d.addBoth(_closed)
+ # If the file has been abandoned, we don't want the close operation to get "stuck",
+ # even if self.async fails to re-fire. Doing the close independently of self.async
+ # in that case ensures that dropping an ssh connection is sufficient to abandon
+ # any heisenfiles that were not explicitly closed in that connection.
+ if abandoned or not has_changed:
+ d.addCallback(_committed)
+ else:
+ self.async.addCallback(_close)
+
+ self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
d.addBoth(_convert_error, request)
return d
return defer.execute(_closed)
# Optimization for read-only handles, when we already know the metadata.
- if not(self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
+ if not (self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
return defer.succeed(_populate_attrs(self.filenode, self.metadata))
d = defer.Deferred()
def _get(ign):
+ if noisy: self.log("_get(%r) in %r, filenode = %r, metadata = %r" % (ign, request, self.filenode, self.metadata), level=NOISY)
+
# self.filenode might be None, but that's ok.
attrs = _populate_attrs(self.filenode, self.metadata, size=self.consumer.get_current_size())
- eventually_callback(d)(attrs)
+ eventually(d.callback, attrs)
return None
self.async.addCallbacks(_get, eventually_errback(d))
d.addBoth(_convert_error, request)
return d
- def setAttrs(self, attrs):
- request = ".setAttrs(attrs) %r" % (attrs,)
+ def setAttrs(self, attrs, only_if_at=None):
+ request = ".setAttrs(%r, only_if_at=%r)" % (attrs, only_if_at)
self.log(request, level=OPERATIONAL)
if not (self.flags & FXF_WRITE):
def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot set attributes for a closed file handle")
return defer.execute(_closed)
- if not "size" in attrs:
- return defer.succeed(None)
-
- size = attrs["size"]
- if not isinstance(size, (int, long)) or size < 0:
+ size = attrs.get("size", None)
+ if size is not None and (not isinstance(size, (int, long)) or size < 0):
def _bad(): raise SFTPError(FX_BAD_MESSAGE, "new size is not a valid nonnegative integer")
return defer.execute(_bad)
d = defer.Deferred()
- def _resize(ign):
- self.consumer.set_current_size(size)
- eventually_callback(d)(None)
+ def _set(ign):
+ if noisy: self.log("_set(%r) in %r" % (ign, request), level=NOISY)
+ current_direntry = _direntry_for(self.parent, self.childname, self.filenode)
+ if only_if_at and only_if_at != current_direntry:
+ if noisy: self.log("not setting attributes: current_direntry=%r in %r" %
+ (current_direntry, request), level=NOISY)
+ return None
+
+ now = time()
+ self.metadata = update_metadata(self.metadata, _attrs_to_metadata(attrs), now)
+ if size is not None:
+ # TODO: should we refuse to truncate a file opened with FXF_APPEND?
+ # <http://allmydata.org/trac/tahoe-lafs/ticket/1037#comment:20>
+ self.consumer.set_current_size(size)
+ eventually(d.callback, None)
return None
- self.async.addCallbacks(_resize, eventually_errback(d))
+ self.async.addCallbacks(_set, eventually_errback(d))
d.addBoth(_convert_error, request)
return d
self.value = value
-# For each immutable file that has been opened with write flags
-# (FXF_WRITE and/or FXF_CREAT) and is still open, this maps from
-# parent_write_uri + "/" + childname_utf8, to (list_of_ISFTPFile, open_time_utc).
+# A "heisenfile" is a file that has been opened with write flags
+# (FXF_WRITE and/or FXF_CREAT) and not yet close-notified.
+# 'all_heisenfiles' maps from a direntry string to a list of
+# GeneralSFTPFile.
+#
+# A direntry string is parent_write_uri + "/" + childname_utf8 for
+# an immutable file, or file_write_uri for a mutable file.
# Updates to this dict are single-threaded.
-all_open_files = {}
+all_heisenfiles = {}
+
+def _reload():
+ global all_heisenfiles
+ all_heisenfiles = {}
class SFTPUserHandler(ConchUser, PrefixingLogMixin):
implements(ISFTPServer)
def __init__(self, client, rootnode, username):
ConchUser.__init__(self)
- PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
+ PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=username)
if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY)
self.channelLookup["session"] = session.SSHSession
self._root = rootnode
self._username = username
self._convergence = client.convergence
- self._logged_out = False
- self._open_files = {} # files created by this user handler and still open
+
+ # maps from UTF-8 paths for this user, to files written and still open
+ self._heisenfiles = {}
def gotVersion(self, otherVersion, extData):
self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL)
'fstatvfs@openssh.com': '2',
}
- def _add_open_files(self, direntry, files_to_add):
+ def logout(self):
+ self.log(".logout()", level=OPERATIONAL)
+
+ for files in self._heisenfiles.itervalues():
+ for f in files:
+ f.abandon()
+
+ def _add_heisenfile_by_path(self, file):
+ self.log("._add_heisenfile_by_path(%r)" % (file,), level=OPERATIONAL)
+
+ userpath = file.get_userpath()
+ if userpath in self._heisenfiles:
+ self._heisenfiles[userpath] += [file]
+ else:
+ self._heisenfiles[userpath] = [file]
+
+ def _add_heisenfile_by_direntry(self, file):
+ self.log("._add_heisenfile_by_direntry(%r)" % (file,), level=OPERATIONAL)
+
+ direntry = file.get_direntry()
if direntry:
- if direntry in self._open_files:
- self._open_files[direntry] += files_to_add
+ if direntry in all_heisenfiles:
+ all_heisenfiles[direntry] += [file]
else:
- self._open_files[direntry] = files_to_add
+ all_heisenfiles[direntry] = [file]
- if direntry in all_open_files:
- (old_files, opentime) = all_open_files[direntry]
- all_open_files[direntry] = (old_files + files_to_add, opentime)
- else:
- all_open_files[direntry] = (files_to_add, time())
+ def _abandon_any_heisenfiles(self, userpath, direntry):
+ request = "._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry)
+ self.log(request, level=OPERATIONAL)
- def _remove_open_files(self, direntry, files_to_remove):
- if direntry and not self._logged_out:
- assert direntry in self._open_files, (direntry, self._open_files)
- assert direntry in all_open_files, (direntry, all_open_files)
+ assert isinstance(userpath, str), userpath
- old_files = self._open_files[direntry]
- new_files = [f for f in old_files if f not in files_to_remove]
- if len(new_files) > 0:
- self._open_files[direntry] = new_files
- else:
- del self._open_files[direntry]
+ # First we synchronously mark all heisenfiles matching the userpath or direntry
+ # as abandoned, and remove them from the two heisenfile dicts. Then we .sync()
+ # each file that we abandoned.
+ #
+ # For each file, the call to .abandon() occurs:
+ # * before the file is closed, in which case it will never be committed
+ # (uploaded+linked or published); or
+ # * after it is closed but before it has been close_notified, in which case the
+ # .sync() ensures that it has been committed (successfully or not) before we
+ # return.
+ #
+ # This avoids a race that might otherwise cause the file to be committed after
+ # the remove operation has completed.
+ #
+ # We return a Deferred that fires with True if any files were abandoned (this
+ # does not mean that they were not committed; it is used to determine whether
+ # a NoSuchChildError from the attempt to delete the file should be suppressed).
+
+ files = []
+ if direntry in all_heisenfiles:
+ files = all_heisenfiles[direntry]
+ del all_heisenfiles[direntry]
+ if userpath in self._heisenfiles:
+ files += self._heisenfiles[userpath]
+ del self._heisenfiles[userpath]
+
+ if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
+
+ for f in files:
+ f.abandon()
+
+ d = defer.succeed(None)
+ for f in files:
+ d.addBoth(f.sync)
+
+ def _done(ign):
+ self.log("done %r" % (request,), level=OPERATIONAL)
+ return len(files) > 0
+ d.addBoth(_done)
+ return d
- (all_old_files, opentime) = all_open_files[direntry]
- all_new_files = [f for f in all_old_files if f not in files_to_remove]
- if len(all_new_files) > 0:
- all_open_files[direntry] = (all_new_files, opentime)
- else:
- del all_open_files[direntry]
-
- def _rename_open_files(self, from_parent, from_childname, to_parent, to_childname):
- """When an direntry is renamed, any open files for that direntry are also renamed.
- Return True if there were any open files at from_direntry."""
-
- from_direntry = self._direntry_for(from_parent, from_childname)
- to_direntry = self._direntry_for(to_parent, to_childname)
-
- if from_direntry in all_open_files:
- (from_files, opentime) = all_open_files[from_direntry]
- del self._open_files[from_direntry]
- del all_open_files[from_direntry]
- for file in from_files:
- file.rename(to_parent, to_childname)
- self._add_open_files(to_direntry, from_files)
- return True
- else:
- return False
+ def _rename_heisenfiles(self, from_userpath, from_parent, from_childname,
+ to_userpath, to_parent, to_childname, overwrite=True):
+ request = ("._rename_heisenfiles(%r, %r, %r, %r, %r, %r, overwrite=%r)" %
+ (from_userpath, from_parent, from_childname, to_userpath, to_parent, to_childname, overwrite))
+ self.log(request, level=OPERATIONAL)
- def _direntry_for(self, parent, childname):
- if parent and childname:
- rw_uri = parent.get_write_uri()
- if rw_uri:
- return rw_uri + "/" + childname.encode('utf-8')
+ assert (isinstance(from_userpath, str) and isinstance(from_childname, unicode) and
+ isinstance(to_userpath, str) and isinstance(to_childname, unicode)), \
+ (from_userpath, from_childname, to_userpath, to_childname)
- return None
+ if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
- def logout(self):
- if not self._logged_out:
- self._logged_out = True
- for (direntry, files_at_direntry) in enumerate(self._open_files):
- self._remove_open_files(direntry, files_at_direntry)
+ # First we synchronously rename all heisenfiles matching the userpath or direntry.
+ # Then we .sync() each file that we renamed.
+ #
+ # For each file, the call to .rename occurs:
+ # * before the file is closed, in which case it will be committed at the
+ # new direntry; or
+ # * after it is closed but before it has been close_notified, in which case the
+ # .sync() ensures that it has been committed (successfully or not) before we
+ # return.
+ #
+ # This avoids a race that might otherwise cause the file to be committed at the
+ # old name after the rename operation has completed.
+ #
+ # Note that if overwrite is False, the caller should already have checked
+ # whether a real direntry exists at the destination. It is possible that another
+ # direntry (heisen or real) comes to exist at the destination after that check,
+ # but in that case it is correct for the rename to succeed (and for the commit
+ # of the heisenfile at the destination to possibly clobber the other entry, since
+ # that can happen anyway when we have concurrent write handles to the same direntry).
+ #
+ # We return a Deferred that fires with True if any files were renamed (this
+ # does not mean that they were not committed; it is used to determine whether
+ # a NoSuchChildError from the rename attempt should be suppressed). If overwrite
+ # is False and there were already heisenfiles at the destination userpath or
+ # direntry, we return a Deferred that fails with SFTPError(FX_PERMISSION_DENIED).
+
+ from_direntry = _direntry_for(from_parent, from_childname)
+ to_direntry = _direntry_for(to_parent, to_childname)
+
+ if noisy: self.log("from_direntry = %r, to_direntry = %r, len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
+ (from_direntry, to_direntry, len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
+
+ if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles):
+ def _existing(): raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
+ if noisy: self.log("existing", level=NOISY)
+ return defer.execute(_existing)
+
+ from_files = []
+ if from_direntry in all_heisenfiles:
+ from_files = all_heisenfiles[from_direntry]
+ del all_heisenfiles[from_direntry]
+ if from_userpath in self._heisenfiles:
+ from_files += self._heisenfiles[from_userpath]
+ del self._heisenfiles[from_userpath]
+
+ if noisy: self.log("from_files = %r in %r" % (from_files, request), level=NOISY)
+
+ for f in from_files:
+ f.rename(to_userpath, to_parent, to_childname)
+ self._add_heisenfile_by_path(f)
+ self._add_heisenfile_by_direntry(f)
+
+ d = defer.succeed(None)
+ for f in from_files:
+ d.addBoth(f.sync)
+
+ def _done(ign):
+ if noisy: self.log("done: len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
+ (len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
+ return len(from_files) > 0
+ d.addBoth(_done)
+ return d
+
+ def _update_attrs_for_heisenfiles(self, userpath, direntry, attrs):
+ request = "._update_attrs_for_heisenfiles(%r, %r, %r)" % (userpath, direntry, attrs)
+ self.log(request, level=OPERATIONAL)
+
+ assert isinstance(userpath, str) and isinstance(direntry, str), (userpath, direntry)
- def _check_abort(self):
- return self._logged_out
+ files = []
+ if direntry in all_heisenfiles:
+ files = all_heisenfiles[direntry]
+ if userpath in self._heisenfiles:
+ files += self._heisenfiles[userpath]
- def _close_notify(self, parent, childname, f):
- self._remove_open_files(self._direntry_for(parent, childname), [f])
+ if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
- def _make_file(self, flags, parent=None, childname=None, filenode=None, metadata=None):
- if noisy: self.log("._make_file(%r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r" %
- (flags, _repr_flags(flags), parent, childname, filenode, metadata), level=NOISY)
+ # We set the metadata for all heisenfiles at this path or direntry.
+ # Since a direntry includes a write URI, we must have authority to
+ # change the metadata of heisenfiles found in the all_heisenfiles dict.
+ # However that's not necessarily the case for heisenfiles found by
+ # path. Therefore we tell the setAttrs method of each file to only
+ # perform the update if the file is at the correct direntry.
+
+ d = defer.succeed(None)
+ for f in files:
+ d.addBoth(f.setAttrs, attrs, only_if_at=direntry)
+
+ def _done(ign):
+ self.log("done %r" % (request,), level=OPERATIONAL)
+ # TODO: this should not return True if only_if_at caused all files to be skipped.
+ return len(files) > 0
+ d.addBoth(_done)
+ return d
+
+ def _sync_heisenfiles(self, userpath, direntry, ignore=None):
+ request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore)
+ self.log(request, level=OPERATIONAL)
+
+ assert isinstance(userpath, str) and isinstance(direntry, (str, NoneType)), (userpath, direntry)
+
+ files = []
+ if direntry in all_heisenfiles:
+ files = all_heisenfiles[direntry]
+ if userpath in self._heisenfiles:
+ files += self._heisenfiles[userpath]
+
+ if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
+
+ d = defer.succeed(None)
+ for f in files:
+ if f is not ignore:
+ d.addBoth(f.sync)
+
+ def _done(ign):
+ self.log("done %r" % (request,), level=OPERATIONAL)
+ return None
+ d.addBoth(_done)
+ return d
+
+ def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):
+ if noisy: self.log("._remove_heisenfile(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY)
+
+ assert isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)), (userpath, childname)
+
+ direntry = _direntry_for(parent, childname)
+ if direntry in all_heisenfiles:
+ all_old_files = all_heisenfiles[direntry]
+ all_new_files = [f for f in all_old_files if f is not file_to_remove]
+ if len(all_new_files) > 0:
+ all_heisenfiles[direntry] = all_new_files
+ else:
+ del all_heisenfiles[direntry]
+
+ if userpath in self._heisenfiles:
+ old_files = self._heisenfiles[userpath]
+ new_files = [f for f in old_files if f is not file_to_remove]
+ if len(new_files) > 0:
+ self._heisenfiles[userpath] = new_files
+ else:
+ del self._heisenfiles[userpath]
+
+ if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
+
+ def _make_file(self, existing_file, userpath, flags, parent=None, childname=None, filenode=None, metadata=None):
+ if noisy: self.log("._make_file(%r, %r, %r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
+ (existing_file, userpath, flags, _repr_flags(flags), parent, childname, filenode, metadata),
+ level=NOISY)
+
+ assert (isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)) and
+ (metadata is None or 'no-write' in metadata)), (userpath, childname, metadata)
- assert metadata is None or 'readonly' in metadata, metadata
writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
+ direntry = _direntry_for(parent, childname, filenode)
+
+ d = self._sync_heisenfiles(userpath, direntry, ignore=existing_file)
if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
- return ShortReadOnlySFTPFile(filenode, metadata)
+ d.addCallback(lambda ign: ShortReadOnlySFTPFile(userpath, filenode, metadata))
else:
- direntry = None
+ close_notify = None
if writing:
- direntry = self._direntry_for(parent, childname)
-
- file = GeneralSFTPFile(self._close_notify, self._check_abort, flags, self._convergence,
- parent=parent, childname=childname, filenode=filenode, metadata=metadata)
- self._add_open_files(direntry, [file])
- return file
+ close_notify = self._remove_heisenfile
+
+ d.addCallback(lambda ign: existing_file or GeneralSFTPFile(userpath, flags, close_notify, self._convergence))
+ def _got_file(file):
+ file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata)
+ if writing:
+ self._add_heisenfile_by_direntry(file)
+ return file
+ d.addCallback(_got_file)
+ return d
- def openFile(self, pathstring, flags, attrs):
- request = ".openFile(%r, %r = %r, %r)" % (pathstring, flags, _repr_flags(flags), attrs)
+ def openFile(self, pathstring, flags, attrs, delay=None):
+ request = ".openFile(%r, %r = %r, %r, delay=%r)" % (pathstring, flags, _repr_flags(flags), attrs, delay)
self.log(request, level=OPERATIONAL)
# This is used for both reading and writing.
- # First exclude invalid combinations of flags.
-
- # /usr/bin/sftp 'get' gives us FXF_READ, while 'put' on a new file
- # gives FXF_WRITE | FXF_CREAT | FXF_TRUNC. I'm guessing that 'put' on an
- # existing file gives the same.
+ # First exclude invalid combinations of flags, and empty paths.
if not (flags & (FXF_READ | FXF_WRITE)):
- raise SFTPError(FX_BAD_MESSAGE,
- "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
+ def _bad_readwrite():
+ raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
+ return defer.execute(_bad_readwrite)
if (flags & FXF_EXCL) and not (flags & FXF_CREAT):
- raise SFTPError(FX_BAD_MESSAGE,
- "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
+ def _bad_exclcreat():
+ raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
+ return defer.execute(_bad_exclcreat)
path = self._path_from_string(pathstring)
if not path:
- raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
+ def _emptypath(): raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
+ return defer.execute(_emptypath)
+
+ # The combination of flags is potentially valid.
+
+ # To work around clients that have race condition bugs, a getAttr, rename, or
+ # remove request following an 'open' request with FXF_WRITE or FXF_CREAT flags,
+ # should succeed even if the 'open' request has not yet completed. So we now
+ # synchronously add a file object into the self._heisenfiles dict, indexed
+ # by its UTF-8 userpath. (We can't yet add it to the all_heisenfiles dict,
+ # because we don't yet have a user-independent path for the file.) The file
+ # object does not know its filenode, parent, or childname at this point.
- # The combination of flags is potentially valid. Now there are two major cases:
+ userpath = self._path_to_utf8(path)
+
+ if flags & (FXF_WRITE | FXF_CREAT):
+ file = GeneralSFTPFile(userpath, flags, self._remove_heisenfile, self._convergence)
+ self._add_heisenfile_by_path(file)
+ else:
+ # We haven't decided which file implementation to use yet.
+ file = None
+
+ desired_metadata = _attrs_to_metadata(attrs)
+
+ # Now there are two major cases:
#
# 1. The path is specified as /uri/FILECAP, with no parent directory.
# If the FILECAP is mutable and writeable, then we can open it in write-only
# d. the child is immutable: if we are trying to open it write-only or
# read/write, then we must be able to write to the parent directory.
#
- # To reduce latency, open succeeds as soon as these conditions are met, even
- # though there might be a failure in downloading the existing file or uploading
- # a new one.
+ # To reduce latency, open normally succeeds as soon as these conditions are
+ # met, even though there might be a failure in downloading the existing file
+ # or uploading a new one. However, there is an exception: if a file has been
+ # written, then closed, and is now being reopened, then we have to delay the
+ # open until the previous upload/publish has completed. This is necessary
+ # because sshfs does not wait for the result of an FXF_CLOSE message before
+ # reporting to the client that a file has been closed. It applies both to
+ # mutable files, and to directory entries linked to an immutable file.
#
# Note that the permission checks below are for more precise error reporting on
# the open call; later operations would fail even if we did not make these checks.
- d = self._get_root(path)
+ d = delay or defer.succeed(None)
+ d.addCallback(lambda ign: self._get_root(path))
def _got_root( (root, path) ):
if root.is_unknown():
raise SFTPError(FX_PERMISSION_DENIED,
- "cannot open an unknown cap (or child of an unknown directory). "
+ "cannot open an unknown cap (or child of an unknown object). "
"Upgrading the gateway to a later Tahoe-LAFS version may help")
if not path:
# case 1
raise SFTPError(FX_FAILURE,
"cannot create a file exclusively when it already exists")
- # The file does not need to be added to all_open_files, because it is not
+ # The file does not need to be added to all_heisenfiles, because it is not
# associated with a directory entry that needs to be updated.
- return self._make_file(flags, filenode=root)
+ metadata = update_metadata(None, desired_metadata, time())
+
+ # We have to decide what to pass for the 'parent_readonly' argument to _no_write,
+ # given that we don't actually have a parent. This only affects the permissions
+ # reported by a getAttrs on this file handle in the case of an immutable file.
+ # We choose 'parent_readonly=True' since that will cause the permissions to be
+ # reported as r--r--r--, which is appropriate because an immutable file can't be
+ # written via this path.
+
+ metadata['no-write'] = _no_write(True, root)
+ return self._make_file(file, userpath, flags, filenode=root, metadata=metadata)
else:
# case 2
childname = path[-1]
- if noisy: self.log("case 2: root = %r, childname = %r, path[:-1] = %r" %
- (root, childname, path[:-1]), level=NOISY)
+
+ if noisy: self.log("case 2: root = %r, childname = %r, desired_metadata = %r, path[:-1] = %r" %
+ (root, childname, desired_metadata, path[:-1]), level=NOISY)
d2 = root.get_child_at_path(path[:-1])
def _got_parent(parent):
if noisy: self.log("_got_parent(%r)" % (parent,), level=NOISY)
if parent.is_unknown():
raise SFTPError(FX_PERMISSION_DENIED,
- "cannot open an unknown cap (or child of an unknown directory). "
+ "cannot open a child of an unknown object. "
"Upgrading the gateway to a later Tahoe-LAFS version may help")
parent_readonly = parent.is_readonly()
zero_length_lit = "URI:LIT:"
if noisy: self.log("%r.set_uri(%r, None, readcap=%r, overwrite=False)" %
(parent, zero_length_lit, childname), level=NOISY)
- d3.addCallback(lambda ign: parent.set_uri(childname, None, readcap=zero_length_lit, overwrite=False))
+ d3.addCallback(lambda ign: parent.set_uri(childname, None, readcap=zero_length_lit,
+ metadata=desired_metadata, overwrite=False))
def _seturi_done(child):
if noisy: self.log("%r.get_metadata_for(%r)" % (parent, childname), level=NOISY)
d4 = parent.get_metadata_for(childname)
if noisy: self.log("%r.get_child_and_metadata(%r)" % (parent, childname), level=NOISY)
d3.addCallback(lambda ign: parent.get_child_and_metadata(childname))
- def _got_child( (filenode, metadata) ):
- if noisy: self.log("_got_child( (%r, %r) )" % (filenode, metadata), level=NOISY)
+ def _got_child( (filenode, current_metadata) ):
+ if noisy: self.log("_got_child( (%r, %r) )" % (filenode, current_metadata), level=NOISY)
+
+ metadata = update_metadata(current_metadata, desired_metadata, time())
+
+ # Ignore the permissions of the desired_metadata in an open call. The permissions
+ # can only be set by setAttrs.
+ metadata['no-write'] = _no_write(parent_readonly, filenode, current_metadata)
if filenode.is_unknown():
raise SFTPError(FX_PERMISSION_DENIED,
if not IFileNode.providedBy(filenode):
raise SFTPError(FX_PERMISSION_DENIED,
"cannot open a directory as if it were a file")
- if (flags & FXF_WRITE) and filenode.is_mutable() and filenode.is_readonly():
- raise SFTPError(FX_PERMISSION_DENIED,
- "cannot open a read-only mutable file for writing")
- if (flags & FXF_WRITE) and parent_readonly:
+ if (flags & FXF_WRITE) and metadata['no-write']:
raise SFTPError(FX_PERMISSION_DENIED,
- "cannot open a file for writing when the parent directory is read-only")
+ "cannot open a non-writeable file for writing")
- metadata['readonly'] = _is_readonly(parent_readonly, filenode)
- return self._make_file(flags, parent=parent, childname=childname, filenode=filenode, metadata=metadata)
+ return self._make_file(file, userpath, flags, parent=parent, childname=childname,
+ filenode=filenode, metadata=metadata)
def _no_child(f):
if noisy: self.log("_no_child(%r)" % (f,), level=NOISY)
f.trap(NoSuchChildError)
raise SFTPError(FX_PERMISSION_DENIED,
"cannot create a file when the parent directory is read-only")
- return self._make_file(flags, parent=parent, childname=childname)
+ return self._make_file(file, userpath, flags, parent=parent, childname=childname)
d3.addCallbacks(_got_child, _no_child)
return d3
return d2
d.addCallback(_got_root)
+ def _remove_on_error(err):
+ if file:
+ self._remove_heisenfile(userpath, None, None, file)
+ return err
+ d.addErrback(_remove_on_error)
d.addBoth(_convert_error, request)
return d
from_path = self._path_from_string(from_pathstring)
to_path = self._path_from_string(to_pathstring)
+ from_userpath = self._path_to_utf8(from_path)
+ to_userpath = self._path_to_utf8(to_path)
# the target directory must already exist
d = deferredutil.gatherResults([self._get_parent_or_node(from_path),
self._get_parent_or_node(to_path)])
def _got( (from_pair, to_pair) ):
- if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r)" %
- (from_pair, to_pair, from_pathstring, to_pathstring), level=NOISY)
+ if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r, overwrite=%r)" %
+ (from_pair, to_pair, from_pathstring, to_pathstring, overwrite), level=NOISY)
(from_parent, from_childname) = from_pair
(to_parent, to_childname) = to_pair
-
+
if from_childname is None:
raise SFTPError(FX_NO_SUCH_FILE, "cannot rename a source object specified by URI")
if to_childname is None:
# <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
# "It is an error if there already exists a file with the name specified
# by newpath."
+ # OpenSSH's SFTP server returns FX_PERMISSION_DENIED for this error.
+ #
# For the standard SSH_FXP_RENAME operation, overwrite=False.
# We also support the posix-rename@openssh.com extension, which uses overwrite=True.
- # FIXME: use move_child_to_path to avoid possible data loss due to #943
- #d2 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
-
- d2 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
- def _check(err):
- if noisy: self.log("_check(%r) in .renameFile(%r, %r)" %
- (err, from_pathstring, to_pathstring), level=NOISY)
-
- if not isinstance(err, Failure) or err.check(NoSuchChildError):
- # If there are open files to be written at the 'from' direntry, then ensure
- # they will now be written at the 'to' direntry instead.
- if noisy: self.log("checking open files:\nself._open_files = %r\nall_open_files = %r" %
- (self._open_files, all_open_files), level=NOISY)
- if self._rename_open_files(from_parent, from_childname, to_parent, to_childname):
- # suppress the NoSuchChildError if any open files were renamed
- if noisy: self.log("after renaming:\nself._open_files = %r\nall_open_files = %r" %
- (self._open_files, all_open_files), level=NOISY)
+ d2 = defer.succeed(None)
+ if not overwrite:
+ d2.addCallback(lambda ign: to_parent.get(to_childname))
+ def _expect_fail(res):
+ if not isinstance(res, Failure):
+ raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
+
+ # It is OK if we fail for errors other than NoSuchChildError, since that probably
+ # indicates some problem accessing the destination directory.
+ res.trap(NoSuchChildError)
+ d2.addBoth(_expect_fail)
+
+ # If there are heisenfiles to be written at the 'from' direntry, then ensure
+ # they will now be written at the 'to' direntry instead.
+ d2.addCallback(lambda ign:
+ self._rename_heisenfiles(from_userpath, from_parent, from_childname,
+ to_userpath, to_parent, to_childname, overwrite=overwrite))
+
+ def _move(renamed):
+ # FIXME: use move_child_to_path to avoid possible data loss due to #943
+ #d3 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
+
+ d3 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
+ def _check(err):
+ if noisy: self.log("_check(%r) in .renameFile(%r, %r, overwrite=%r)" %
+ (err, from_pathstring, to_pathstring, overwrite), level=NOISY)
+
+ if not isinstance(err, Failure) or (renamed and err.check(NoSuchChildError)):
return None
- elif err.check(ExistingChildError):
- # OpenSSH SFTP server returns FX_PERMISSION_DENIED
- raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_pathstring)
+ if not overwrite and err.check(ExistingChildError):
+ raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
- return err
- d2.addBoth(_check)
+ return err
+ d3.addBoth(_check)
+ return d3
+ d2.addCallback(_move)
return d2
d.addCallback(_got)
d.addBoth(_convert_error, request)
self.log(request, level=OPERATIONAL)
path = self._path_from_string(pathstring)
- metadata = self._attrs_to_metadata(attrs)
+ metadata = _attrs_to_metadata(attrs)
+ if 'no-write' in metadata:
+ def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "cannot create a directory that is initially read-only")
+ return defer.execute(_denied)
+
d = self._get_root(path)
d.addCallback(lambda (root, path):
self._get_or_create_directories(root, path, metadata))
return d
def _remove_object(self, path, must_be_directory=False, must_be_file=False):
- d = defer.maybeDeferred(self._get_parent_or_node, path)
+ userpath = self._path_to_utf8(path)
+ d = self._get_parent_or_node(path)
def _got_parent( (parent, childname) ):
- # FIXME (minor): there is a race condition between the 'get' and 'delete',
- # so it is possible that the must_be_directory or must_be_file restrictions
- # might not be enforced correctly if the type has just changed.
-
if childname is None:
- raise SFTPError(FX_NO_SUCH_FILE, "cannot delete an object specified by URI")
-
- d2 = parent.get(childname)
- def _got_child(child):
- # Unknown children can be removed by either removeFile or removeDirectory.
- if must_be_directory and IFileNode.providedBy(child):
- raise SFTPError(FX_PERMISSION_DENIED, "rmdir called on a file")
- if must_be_file and IDirectoryNode.providedBy(child):
- raise SFTPError(FX_PERMISSION_DENIED, "rmfile called on a directory")
- return parent.delete(childname)
- d2.addCallback(_got_child)
+ raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
+
+ direntry = _direntry_for(parent, childname)
+ d2 = defer.succeed(False)
+ if not must_be_directory:
+ d2.addCallback(lambda ign: self._abandon_any_heisenfiles(userpath, direntry))
+
+ d2.addCallback(lambda abandoned:
+ parent.delete(childname, must_exist=not abandoned,
+ must_be_directory=must_be_directory, must_be_file=must_be_file))
return d2
d.addCallback(_got_parent)
return d
results = []
for filename, (child, metadata) in children.iteritems():
# The file size may be cached or absent.
- metadata['readonly'] = _is_readonly(parent_readonly, child)
+ metadata['no-write'] = _no_write(parent_readonly, child, metadata)
attrs = _populate_attrs(child, metadata)
filename_utf8 = filename.encode('utf-8')
longname = _lsLine(filename_utf8, attrs)
# information isn't currently stored in mutable shares, I think.
path = self._path_from_string(pathstring)
+ userpath = self._path_to_utf8(path)
d = self._get_parent_or_node(path)
def _got_parent_or_node( (parent_or_node, childname) ):
if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
+
+ # Some clients will incorrectly try to get the attributes
+ # of a file immediately after opening it, before it has been put
+ # into the all_heisenfiles table. This is a race condition bug in
+ # the client, but we handle it anyway by calling .sync() on all
+ # files matching either the path or the direntry.
+
+ direntry = _direntry_for(parent_or_node, childname)
+ d2 = self._sync_heisenfiles(userpath, direntry)
+
if childname is None:
node = parent_or_node
- d2 = node.get_current_size()
+ d2.addCallback(lambda ign: node.get_current_size())
d2.addCallback(lambda size:
- _populate_attrs(node, {'readonly': node.is_unknown() or node.is_readonly()}, size=size))
- return d2
+ _populate_attrs(node, {'no-write': node.is_unknown() or node.is_readonly()}, size=size))
else:
parent = parent_or_node
- d2 = parent.get_child_and_metadata_at_path([childname])
+ d2.addCallback(lambda ign: parent.get_child_and_metadata_at_path([childname]))
def _got( (child, metadata) ):
if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY)
assert IDirectoryNode.providedBy(parent), parent
- metadata['readonly'] = _is_readonly(parent.is_readonly(), child)
+ metadata['no-write'] = _no_write(parent.is_readonly(), child, metadata)
d3 = child.get_current_size()
d3.addCallback(lambda size: _populate_attrs(child, metadata, size=size))
return d3
def _nosuch(err):
if noisy: self.log("_nosuch(%r)" % (err,), level=NOISY)
err.trap(NoSuchChildError)
- direntry = self._direntry_for(parent, childname)
- if noisy: self.log("checking open files:\nself._open_files = %r\nall_open_files = %r\ndirentry=%r" %
- (self._open_files, all_open_files, direntry), level=NOISY)
- if direntry in all_open_files:
- (files, opentime) = all_open_files[direntry]
- sftptime = _to_sftp_time(opentime)
- # A file that has been opened for writing necessarily has permissions rw-rw-rw-.
- return {'permissions': S_IFREG | 0666,
- 'size': 0,
- 'createtime': sftptime,
- 'ctime': sftptime,
- 'mtime': sftptime,
- 'atime': sftptime,
- }
+ if noisy: self.log("checking open files:\nself._heisenfiles = %r\nall_heisenfiles = %r\ndirentry=%r" %
+ (self._heisenfiles, all_heisenfiles, direntry), level=NOISY)
+ if direntry in all_heisenfiles:
+ files = all_heisenfiles[direntry]
+ if len(files) == 0: # pragma: no cover
+ return err
+ # use the heisenfile that was most recently opened
+ return files[-1].getAttrs()
return err
d2.addCallbacks(_got, _nosuch)
- return d2
+ return d2
d.addCallback(_got_parent_or_node)
d.addBoth(_convert_error, request)
return d
def setAttrs(self, pathstring, attrs):
- self.log(".setAttrs(%r, %r)" % (pathstring, attrs), level=OPERATIONAL)
+ request = ".setAttrs(%r, %r)" % (pathstring, attrs)
+ self.log(request, level=OPERATIONAL)
if "size" in attrs:
# this would require us to download and re-upload the truncated/extended
# file contents
- def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "setAttrs wth size attribute")
+ def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "setAttrs wth size attribute unsupported")
return defer.execute(_unsupported)
- return defer.succeed(None)
+
+ path = self._path_from_string(pathstring)
+ userpath = self._path_to_utf8(path)
+ d = self._get_parent_or_node(path)
+ def _got_parent_or_node( (parent_or_node, childname) ):
+ if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
+
+ direntry = _direntry_for(parent_or_node, childname)
+ d2 = self._update_attrs_for_heisenfiles(userpath, direntry, attrs)
+
+ def _update(updated_heisenfiles):
+ if childname is None:
+ if updated_heisenfiles:
+ return None
+ raise SFTPError(FX_NO_SUCH_FILE, userpath)
+ else:
+ desired_metadata = _attrs_to_metadata(attrs)
+ if noisy: self.log("desired_metadata = %r" % (desired_metadata,), level=NOISY)
+
+ d3 = parent_or_node.set_metadata_for(childname, desired_metadata)
+ def _nosuch(err):
+ if updated_heisenfiles:
+ err.trap(NoSuchChildError)
+ else:
+ return err
+ d3.addErrback(_nosuch)
+ return d3
+ d2.addCallback(_update)
+ d2.addCallback(lambda ign: None)
+ return d2
+ d.addCallback(_got_parent_or_node)
+ d.addBoth(_convert_error, request)
+ return d
def readLink(self, pathstring):
self.log(".readLink(%r)" % (pathstring,), level=OPERATIONAL)
if extensionName == 'posix-rename@openssh.com':
def _bad(): raise SFTPError(FX_BAD_MESSAGE, "could not parse posix-rename@openssh.com request")
+ if 4 > len(extensionData): return defer.execute(_bad)
(fromPathLen,) = struct.unpack('>L', extensionData[0:4])
if 8 + fromPathLen > len(extensionData): return defer.execute(_bad)
fromPathstring = extensionData[4:(4 + fromPathLen)]
toPathstring = extensionData[(8 + fromPathLen):]
d = self.renameFile(fromPathstring, toPathstring, overwrite=True)
- d.addCallback(lambda ign: "")
+
+ # Twisted conch assumes that the response from an extended request is either
+ # an error, or an FXP_EXTENDED_REPLY. But it happens to do the right thing
+ # (respond with an FXP_STATUS message) if we return a Failure with code FX_OK.
+ def _succeeded(ign):
+ raise SFTPError(FX_OK, "request succeeded")
+ d.addCallback(_succeeded)
return d
if extensionName == 'statvfs@openssh.com' or extensionName == 'fstatvfs@openssh.com':
+ # f_bsize and f_frsize should be the same to avoid a bug in 'df'
return defer.succeed(struct.pack('>11Q',
1024, # uint64 f_bsize /* file system block size */
1024, # uint64 f_frsize /* fundamental fs block size */
def realPath(self, pathstring):
self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL)
- path_utf8 = [p.encode('utf-8') for p in self._path_from_string(pathstring)]
- return "/" + "/".join(path_utf8)
+ return self._path_to_utf8(self._path_from_string(pathstring))
+
+ def _path_to_utf8(self, path):
+ return (u"/" + u"/".join(path)).encode('utf-8')
def _path_from_string(self, pathstring):
if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
+ assert isinstance(pathstring, str), pathstring
+
# The home directory is the root directory.
pathstring = pathstring.strip("/")
if pathstring == "" or pathstring == ".":
def _get_root(self, path):
# return Deferred (root, remaining_path)
+ d = defer.succeed(None)
if path and path[0] == u"uri":
- d = defer.maybeDeferred(self._client.create_node_from_uri, path[1].encode('utf-8'))
+ d.addCallback(lambda ign: self._client.create_node_from_uri(path[1].encode('utf-8')))
d.addCallback(lambda root: (root, path[2:]))
else:
- d = defer.succeed((self._root, path))
+ d.addCallback(lambda ign: (self._root, path))
return d
def _get_parent_or_node(self, path):
d.addCallback(_got_root)
return d
- def _attrs_to_metadata(self, attrs):
- metadata = {}
- for key in attrs:
- if key == "mtime" or key == "ctime" or key == "createtime":
- metadata[key] = long(attrs[key])
- elif key.startswith("ext_"):
- metadata[key] = str(attrs[key])
+class FakeTransport:
+ implements(ITransport)
+ def write(self, data):
+ logmsg("FakeTransport.write(<data of length %r>)" % (len(data),), level=NOISY)
- return metadata
+ def writeSequence(self, data):
+ logmsg("FakeTransport.writeSequence(...)", level=NOISY)
+ def loseConnection(self):
+ logmsg("FakeTransport.loseConnection()", level=NOISY)
-class SFTPUser(ConchUser, PrefixingLogMixin):
- implements(ISession)
- def __init__(self, check_abort, client, rootnode, username, convergence):
- ConchUser.__init__(self)
- PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
+ # getPeer and getHost can just raise errors, since we don't know what to return
- self.channelLookup["session"] = session.SSHSession
- self.subsystemLookup["sftp"] = FileTransferServer
- self.check_abort = check_abort
- self.client = client
- self.root = rootnode
- self.username = username
- self.convergence = convergence
+class ShellSession(PrefixingLogMixin):
+ implements(ISession)
+ def __init__(self, userHandler):
+ PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
+ if noisy: self.log(".__init__(%r)" % (userHandler), level=NOISY)
def getPty(self, terminal, windowSize, attrs):
self.log(".getPty(%r, %r, %r)" % (terminal, windowSize, attrs), level=OPERATIONAL)
- raise NotImplementedError
def openShell(self, protocol):
self.log(".openShell(%r)" % (protocol,), level=OPERATIONAL)
- raise NotImplementedError
+ if hasattr(protocol, 'transport') and protocol.transport is None:
+ protocol.transport = FakeTransport() # work around Twisted bug
+
+ return self._unsupported(protocol)
def execCommand(self, protocol, cmd):
self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
- raise NotImplementedError
+ if hasattr(protocol, 'transport') and protocol.transport is None:
+ protocol.transport = FakeTransport() # work around Twisted bug
+
+ d = defer.succeed(None)
+ if cmd == "df -P -k /":
+ d.addCallback(lambda ign: protocol.write(
+ "Filesystem 1024-blocks Used Available Capacity Mounted on\r\n"
+ "tahoe 628318530 314159265 314159265 50% /\r\n"))
+ d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessDone(None))))
+ else:
+ d.addCallback(lambda ign: self._unsupported(protocol))
+ return d
+
+ def _unsupported(self, protocol):
+ d = defer.succeed(None)
+ d.addCallback(lambda ign: protocol.errReceived(
+ "This server supports only the SFTP protocol. It does not support SCP,\r\n"
+ "interactive shell sessions, or commands other than one needed by sshfs.\r\n"))
+ d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
+ return d
def windowChanged(self, newWindowSize):
self.log(".windowChanged(%r)" % (newWindowSize,), level=OPERATIONAL)
- def eofReceived():
+ def eofReceived(self):
self.log(".eofReceived()", level=OPERATIONAL)
def closed(self):
self.log(".closed()", level=OPERATIONAL)
-# if you have an SFTPUser, and you want something that provides ISFTPServer,
-# then you get SFTPHandler(user)
-components.registerAdapter(SFTPHandler, SFTPUser, ISFTPServer)
+# If you have an SFTPUserHandler and want something that provides ISession, you get
+# ShellSession(userHandler).
+# We use adaptation because this must be a different object to the SFTPUserHandler.
+components.registerAdapter(ShellSession, SFTPUserHandler, ISession)
+
-from auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
+from allmydata.frontends.auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
class Dispatcher:
implements(portal.IRealm)