-import os, tempfile, heapq, binascii, traceback, array, stat, struct
+import heapq, traceback, array, stat, struct
+from types import NoneType
from stat import S_IFREG, S_IFDIR
from time import time, strftime, localtime
from twisted.internet.interfaces import ITransport
from twisted.internet import defer
-from twisted.internet.interfaces import IFinishableConsumer
+from twisted.internet.interfaces import IConsumer
from foolscap.api import eventually
from allmydata.util import deferredutil
+from allmydata.util.assertutil import _assert, precondition
from allmydata.util.consumer import download_to_data
from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
NoSuchChildError, ChildOfWrongTypeError
from allmydata.mutable.common import NotWriteableError
+from allmydata.mutable.publish import MutableFileHandle
from allmydata.immutable.upload import FileHandle
from allmydata.dirnode import update_metadata
-
-from pycryptopp.cipher.aes import AES
+from allmydata.util.fileutil import EncryptedTemporaryFile
noisy = True
use_foolscap_logging = True
def _convert_error(res, request):
+ """If res is not a Failure, return it, otherwise reraise the appropriate
+ SFTPError."""
+
if not isinstance(res, Failure):
logged_res = res
if isinstance(res, str): logged_res = "<data of length %r>" % (len(res),)
logmsg("RAISE %r %r" % (request, err.value), level=OPERATIONAL)
try:
if noisy: logmsg(traceback.format_exc(err.value), level=NOISY)
- except: # pragma: no cover
+ except Exception: # pragma: no cover
pass
# The message argument to SFTPError must not reveal information that
- # might compromise anonymity.
+ # might compromise anonymity, if we are running over an anonymous network.
if err.check(SFTPError):
# original raiser of SFTPError has responsibility to ensure anonymity
st_gid = "tahoe"
st_mtime = attrs.get("mtime", 0)
st_mode = attrs["permissions"]
- # TODO: check that clients are okay with this being a "?".
- # (They should be because the longname is intended for human
- # consumption.)
- st_size = attrs.get("size", "?")
+
+ # Some clients won't tolerate '?' in the size field (#1337).
+ st_size = attrs.get("size", 0)
+
# We don't know how many links there really are to this object.
st_nlink = 1
- # Based on <http://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
- # We can't call the version in Twisted because we might have a version earlier than
- # <http://twistedmatrix.com/trac/changeset/25412> (released in Twisted 8.2).
+ # Based on <https://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
+ # We previously could not call the version in Twisted because we needed the change
+ # <https://twistedmatrix.com/trac/changeset/25412> (released in Twisted v8.2).
+ # Since we now depend on Twisted v10.1, consider calling Twisted's version.
mode = st_mode
perms = array.array('c', '-'*10)
return l
-def _no_write(parent_readonly, child, metadata):
+def _no_write(parent_readonly, child, metadata=None):
"""Whether child should be listed as having read-only permissions in parent."""
if child.is_unknown():
elif parent_readonly or IDirectoryNode.providedBy(child):
return True
else:
- return metadata.get('no-write', False)
+ return metadata is not None and metadata.get('no-write', False)
def _populate_attrs(childnode, metadata, size=None):
if childnode and size is None:
size = childnode.get_size()
if size is not None:
- assert isinstance(size, (int, long)) and not isinstance(size, bool), repr(size)
+ _assert(isinstance(size, (int, long)) and not isinstance(size, bool), size=size)
attrs['size'] = size
perms = S_IFREG | 0666
if metadata.get('no-write', False):
perms &= S_IFDIR | S_IFREG | 0555 # clear 'w' bits
- # see webapi.txt for what these times mean
+ # See webapi.txt for what these times mean.
+ # We would prefer to omit atime, but SFTP version 3 can only
+ # accept mtime if atime is also set.
if 'linkmotime' in metadata.get('tahoe', {}):
- attrs['mtime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
+ attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
elif 'mtime' in metadata:
- # We would prefer to omit atime, but SFTP version 3 can only
- # accept mtime if atime is also set.
- attrs['mtime'] = _to_sftp_time(metadata['mtime'])
- attrs['atime'] = attrs['mtime']
+ attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['mtime'])
if 'linkcrtime' in metadata.get('tahoe', {}):
attrs['createtime'] = _to_sftp_time(metadata['tahoe']['linkcrtime'])
- if 'ctime' in metadata:
- attrs['ctime'] = _to_sftp_time(metadata['ctime'])
-
attrs['permissions'] = perms
# twisted.conch.ssh.filetransfer only implements SFTP version 3,
def _direntry_for(filenode_or_parent, childname, filenode=None):
+ precondition(isinstance(childname, (unicode, NoneType)), childname=childname)
+
if childname is None:
filenode_or_parent = filenode
return None
-class EncryptedTemporaryFile(PrefixingLogMixin):
- # not implemented: next, readline, readlines, xreadlines, writelines
-
- def __init__(self):
- PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
- self.file = tempfile.TemporaryFile()
- self.key = os.urandom(16) # AES-128
-
- def _crypt(self, offset, data):
- # TODO: use random-access AES (pycryptopp ticket #18)
- offset_big = offset // 16
- offset_small = offset % 16
- iv = binascii.unhexlify("%032x" % offset_big)
- cipher = AES(self.key, iv=iv)
- cipher.process("\x00"*offset_small)
- return cipher.process(data)
-
- def close(self):
- self.file.close()
-
- def flush(self):
- self.file.flush()
-
- def seek(self, offset, whence=os.SEEK_SET):
- if noisy: self.log(".seek(%r, %r)" % (offset, whence), level=NOISY)
- self.file.seek(offset, whence)
-
- def tell(self):
- offset = self.file.tell()
- if noisy: self.log(".tell() = %r" % (offset,), level=NOISY)
- return offset
-
- def read(self, size=-1):
- if noisy: self.log(".read(%r)" % (size,), level=NOISY)
- index = self.file.tell()
- ciphertext = self.file.read(size)
- plaintext = self._crypt(index, ciphertext)
- return plaintext
-
- def write(self, plaintext):
- if noisy: self.log(".write(<data of length %r>)" % (len(plaintext),), level=NOISY)
- index = self.file.tell()
- ciphertext = self._crypt(index, plaintext)
- self.file.write(ciphertext)
-
- def truncate(self, newsize):
- if noisy: self.log(".truncate(%r)" % (newsize,), level=NOISY)
- self.file.truncate(newsize)
-
-
class OverwriteableFileConsumer(PrefixingLogMixin):
- implements(IFinishableConsumer)
+ implements(IConsumer)
"""I act both as a consumer for the download of the original file contents, and as a
wrapper for a temporary file that records the downloaded data and any overwrites.
I use a priority queue to keep track of which regions of the file have been overwritten
self.milestones = [] # empty heap of (offset, d)
self.overwrites = [] # empty heap of (start, end)
self.is_closed = False
- self.done = self.when_reached(download_size) # adds a milestone
- self.is_done = False
- def _signal_done(ign):
- if noisy: self.log("DONE", level=NOISY)
- self.is_done = True
- self.done.addCallback(_signal_done)
+
+ self.done = defer.Deferred()
+ self.done_status = None # None -> not complete, Failure -> download failed, str -> download succeeded
self.producer = None
def get_file(self):
self.overwrite(self.current_size, "\x00" * (size - self.current_size))
self.current_size = size
- # invariant: self.download_size <= self.current_size
+ # make the invariant self.download_size <= self.current_size be true again
if size < self.download_size:
self.download_size = size
+
if self.downloaded >= self.download_size:
- self.finish()
+ self.download_done("size changed")
def registerProducer(self, p, streaming):
if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
+ if self.producer is not None:
+ raise RuntimeError("producer is already registered")
+
self.producer = p
if streaming:
# call resumeProducing once to start things off
p.resumeProducing()
else:
def _iterate():
- if not self.is_done:
+ if self.done_status is None:
p.resumeProducing()
eventually(_iterate)
_iterate()
return
if noisy: self.log("MILESTONE %r %r" % (next, d), level=NOISY)
heapq.heappop(self.milestones)
- eventually_callback(d)(None)
+ eventually_callback(d)("reached")
if milestone >= self.download_size:
- self.finish()
+ self.download_done("reached download size")
def overwrite(self, offset, data):
if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
+ if self.is_closed:
+ self.log("overwrite called on a closed OverwriteableFileConsumer", level=WEIRD)
+ raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
+
if offset > self.current_size:
# Normally writing at an offset beyond the current end-of-file
# would leave a hole that appears filled with zeroes. However, an
The caller must perform no more overwrites until the Deferred has fired."""
if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
+ if self.is_closed:
+ self.log("read called on a closed OverwriteableFileConsumer", level=WEIRD)
+ raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
+
+ # Note that the overwrite method is synchronous. When a write request is processed
+ # (e.g. a writeChunk request on the async queue of GeneralSFTPFile), overwrite will
+ # be called and will update self.current_size if necessary before returning. Therefore,
+ # self.current_size will be up-to-date for a subsequent call to this read method, and
+ # so it is correct to do the check for a read past the end-of-file here.
if offset >= self.current_size:
def _eof(): raise EOFError("read past end of file")
return defer.execute(_eof)
if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
needed = min(offset + length, self.download_size)
- d = self.when_reached(needed)
- def _reached(ign):
+
+ # If we fail to reach the needed number of bytes, the read request will fail.
+ d = self.when_reached_or_failed(needed)
+ def _reached_in_read(res):
# It is not necessarily the case that self.downloaded >= needed, because
# the file might have been truncated (thus truncating the download) and
# then extended.
- assert self.current_size >= offset + length, (self.current_size, offset, length)
- if noisy: self.log("self.f = %r" % (self.f,), level=NOISY)
+ _assert(self.current_size >= offset + length,
+ current_size=self.current_size, offset=offset, length=length)
+ if noisy: self.log("_reached_in_read(%r), self.f = %r" % (res, self.f,), level=NOISY)
self.f.seek(offset)
return self.f.read(length)
- d.addCallback(_reached)
+ d.addCallback(_reached_in_read)
return d
- def when_reached(self, index):
- if noisy: self.log(".when_reached(%r)" % (index,), level=NOISY)
- if index <= self.downloaded: # already reached
- if noisy: self.log("already reached %r" % (index,), level=NOISY)
- return defer.succeed(None)
+ def when_reached_or_failed(self, index):
+ if noisy: self.log(".when_reached_or_failed(%r)" % (index,), level=NOISY)
+ def _reached(res):
+ if noisy: self.log("reached %r with result %r" % (index, res), level=NOISY)
+ return res
+
+ if self.done_status is not None:
+ return defer.execute(_reached, self.done_status)
+ if index <= self.downloaded: # already reached successfully
+ if noisy: self.log("already reached %r successfully" % (index,), level=NOISY)
+ return defer.succeed("already reached successfully")
d = defer.Deferred()
- def _reached(ign):
- if noisy: self.log("reached %r" % (index,), level=NOISY)
- return ign
d.addCallback(_reached)
heapq.heappush(self.milestones, (index, d))
return d
def when_done(self):
- return self.done
+ d = defer.Deferred()
+ self.done.addCallback(lambda ign: eventually_callback(d)(self.done_status))
+ return d
+
+ def download_done(self, res):
+ _assert(isinstance(res, (str, Failure)), res=res)
+ # Only the first call to download_done counts, but we log subsequent calls
+ # (multiple calls are normal).
+ if self.done_status is not None:
+ self.log("IGNORING extra call to download_done with result %r; previous result was %r"
+ % (res, self.done_status), level=OPERATIONAL)
+ return
+
+ self.log("DONE with result %r" % (res,), level=OPERATIONAL)
+
+ # We avoid errbacking self.done so that we are not left with an 'Unhandled error in Deferred'
+ # in case when_done() is never called. Instead we stash the failure in self.done_status,
+ # from where the callback added in when_done() can retrieve it.
+ self.done_status = res
+ eventually_callback(self.done)(None)
- def finish(self):
while len(self.milestones) > 0:
(next, d) = self.milestones[0]
- if noisy: self.log("MILESTONE FINISH %r %r" % (next, d), level=NOISY)
+ if noisy: self.log("MILESTONE FINISH %r %r %r" % (next, d, res), level=NOISY)
heapq.heappop(self.milestones)
# The callback means that the milestone has been reached if
# it is ever going to be. Note that the file may have been
# truncated to before the milestone.
- eventually_callback(d)(None)
-
- # FIXME: causes spurious failures
- #self.unregisterProducer()
+ eventually_callback(d)(res)
def close(self):
if not self.is_closed:
self.is_closed = True
try:
self.f.close()
- except BaseException as e:
+ except Exception, e:
self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
- self.finish()
+ self.download_done("closed")
+ return self.done_status
def unregisterProducer(self):
- if self.producer:
- self.producer.stopProducing()
- self.producer = None
+ # This will happen just before our client calls download_done, which will tell
+ # us the outcome of the download; we don't know the outcome at this point.
+ self.producer = None
+ self.log("producer unregistered", level=NOISY)
SIZE_THRESHOLD = 1000
implements(ISFTPFile)
"""I represent a file handle to a particular file on an SFTP connection.
I am used only for short immutable files opened in read-only mode.
- The file contents are downloaded to memory when I am created."""
+ When I am created, the file contents start to be downloaded to memory.
+ self.async is used to delay read requests until the download has finished."""
def __init__(self, userpath, filenode, metadata):
PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY)
- assert IFileNode.providedBy(filenode), filenode
+ precondition(isinstance(userpath, str) and IFileNode.providedBy(filenode),
+ userpath=userpath, filenode=filenode)
self.filenode = filenode
self.metadata = metadata
self.async = download_to_data(filenode)
# i.e. we respond with an EOF error iff offset is already at EOF.
if offset >= len(data):
- eventually_errback(d)(SFTPError(FX_EOF, "read at or past end of file"))
+ eventually_errback(d)(Failure(SFTPError(FX_EOF, "read at or past end of file")))
else:
- eventually_callback(d)(data[offset:min(offset+length, len(data))])
+ eventually_callback(d)(data[offset:offset+length]) # truncated if offset+length > len(data)
return data
self.async.addCallbacks(_read, eventually_errback(d))
d.addBoth(_convert_error, request)
if noisy: self.log(".__init__(%r, %r = %r, %r, <convergence censored>)" %
(userpath, flags, _repr_flags(flags), close_notify), level=NOISY)
+ precondition(isinstance(userpath, str), userpath=userpath)
self.userpath = userpath
self.flags = flags
self.close_notify = close_notify
self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" %
(parent, childname, filenode, metadata), level=OPERATIONAL)
+ precondition(isinstance(childname, (unicode, NoneType)), childname=childname)
+ precondition(filenode is None or IFileNode.providedBy(filenode), filenode=filenode)
+ precondition(not self.closed, sftpfile=self)
+
# If the file has been renamed, the new (parent, childname) takes precedence.
if self.parent is None:
self.parent = parent
self.filenode = filenode
self.metadata = metadata
- if not self.closed:
- tempfile_maker = EncryptedTemporaryFile
+ tempfile_maker = EncryptedTemporaryFile
- if (self.flags & FXF_TRUNC) or not filenode:
- # We're either truncating or creating the file, so we don't need the old contents.
- self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
- self.consumer.finish()
- else:
- assert IFileNode.providedBy(filenode), filenode
-
- # TODO: use download interface described in #993 when implemented.
- if filenode.is_mutable():
- self.async.addCallback(lambda ign: filenode.download_best_version())
- def _downloaded(data):
- self.consumer = OverwriteableFileConsumer(len(data), tempfile_maker)
- self.consumer.write(data)
- self.consumer.finish()
- return None
- self.async.addCallback(_downloaded)
- else:
- download_size = filenode.get_size()
- assert download_size is not None, "download_size is None"
- self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
- def _read(ign):
- if noisy: self.log("_read immutable", level=NOISY)
- filenode.read(self.consumer, 0, None)
- self.async.addCallback(_read)
+ if (self.flags & FXF_TRUNC) or not filenode:
+ # We're either truncating or creating the file, so we don't need the old contents.
+ self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
+ self.consumer.download_done("download not needed")
+ else:
+ self.async.addCallback(lambda ignored: filenode.get_best_readable_version())
+
+ def _read(version):
+ if noisy: self.log("_read", level=NOISY)
+ download_size = version.get_size()
+ _assert(download_size is not None)
+
+ self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
+
+ d = version.read(self.consumer, 0, None)
+ def _finished(res):
+ if not isinstance(res, Failure):
+ res = "download finished"
+ self.consumer.download_done(res)
+ d.addBoth(_finished)
+ # It is correct to drop d here.
+ self.async.addCallback(_read)
eventually_callback(self.async)(None)
if noisy: self.log("open done", level=NOISY)
return self
+ def get_userpath(self):
+ return self.userpath
+
+ def get_direntry(self):
+ return _direntry_for(self.parent, self.childname)
+
def rename(self, new_userpath, new_parent, new_childname):
self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL)
+ precondition(isinstance(new_userpath, str) and isinstance(new_childname, unicode),
+ new_userpath=new_userpath, new_childname=new_childname)
self.userpath = new_userpath
self.parent = new_parent
self.childname = new_childname
d.addBoth(_done)
return d
- def get_metadata(self):
- return self.metadata
-
def readChunk(self, offset, length):
request = ".readChunk(%r, %r)" % (offset, length)
self.log(request, level=OPERATIONAL)
def _read(ign):
if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
d2 = self.consumer.read(offset, length)
- d2.addCallbacks(eventually_callback(d), eventually_errback(d))
+ d2.addBoth(eventually_callback(d))
# It is correct to drop d2 here.
return None
self.async.addCallbacks(_read, eventually_errback(d))
# don't addErrback to self.async, just allow subsequent async ops to fail.
return defer.succeed(None)
+ def _do_close(self, res, d=None):
+ if noisy: self.log("_do_close(%r)" % (res,), level=NOISY)
+ status = None
+ if self.consumer:
+ status = self.consumer.close()
+
+ # We must close_notify before re-firing self.async.
+ if self.close_notify:
+ self.close_notify(self.userpath, self.parent, self.childname, self)
+
+ if not isinstance(res, Failure) and isinstance(status, Failure):
+ res = status
+
+ if d:
+ eventually_callback(d)(res)
+ elif isinstance(res, Failure):
+ self.log("suppressing %r" % (res,), level=OPERATIONAL)
+
def close(self):
request = ".close()"
self.log(request, level=OPERATIONAL)
self.closed = True
if not (self.flags & (FXF_WRITE | FXF_CREAT)):
- def _readonly_close():
- if self.consumer:
- self.consumer.close()
- return defer.execute(_readonly_close)
+ # We never fail a close of a handle opened only for reading, even if the file
+ # failed to download. (We could not do so deterministically, because it would
+ # depend on whether we reached the point of failure before abandoning the
+ # download.) Any reads that depended on file content that could not be downloaded
+ # will have failed. It is important that we don't close the consumer until
+ # previous read operations have completed.
+ self.async.addBoth(self._do_close)
+ return defer.succeed(None)
# We must capture the abandoned, parent, and childname variables synchronously
# at the close call. This is needed by the correctness arguments in the comments
abandoned = self.abandoned
parent = self.parent
childname = self.childname
-
+
# has_changed is set when writeChunk is called, not when the write occurs, so
# it is correct to optimize out the commit if it is False at the close call.
has_changed = self.has_changed
- def _committed(res):
- if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
-
- self.consumer.close()
-
- # We must close_notify before re-firing self.async.
- if self.close_notify:
- self.close_notify(self.userpath, self.parent, self.childname, self)
- return res
-
- def _close(ign):
+ def _commit(ign):
d2 = self.consumer.when_done()
if self.filenode and self.filenode.is_mutable():
- self.log("update mutable file %r childname=%r" % (self.filenode, childname), level=OPERATIONAL)
+ self.log("update mutable file %r childname=%r metadata=%r"
+ % (self.filenode, childname, self.metadata), level=OPERATIONAL)
if self.metadata.get('no-write', False) and not self.filenode.is_readonly():
- assert parent and childname, (parent, childname, self.metadata)
+ _assert(parent and childname, parent=parent, childname=childname, metadata=self.metadata)
d2.addCallback(lambda ign: parent.set_metadata_for(childname, self.metadata))
- d2.addCallback(lambda ign: self.consumer.get_current_size())
- d2.addCallback(lambda size: self.consumer.read(0, size))
- d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
+ d2.addCallback(lambda ign: self.filenode.overwrite(MutableFileHandle(self.consumer.get_file())))
else:
def _add_file(ign):
self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
u = FileHandle(self.consumer.get_file(), self.convergence)
return parent.add_file(childname, u, metadata=self.metadata)
d2.addCallback(_add_file)
-
- d2.addBoth(_committed)
return d2
- d = defer.Deferred()
-
# If the file has been abandoned, we don't want the close operation to get "stuck",
- # even if self.async fails to re-fire. Doing the close independently of self.async
- # in that case ensures that dropping an ssh connection is sufficient to abandon
+ # even if self.async fails to re-fire. Completing the close independently of self.async
+ # in that case should ensure that dropping an ssh connection is sufficient to abandon
# any heisenfiles that were not explicitly closed in that connection.
if abandoned or not has_changed:
- d.addCallback(_committed)
+ d = defer.succeed(None)
+ self.async.addBoth(self._do_close)
else:
- self.async.addCallback(_close)
-
- self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
+ d = defer.Deferred()
+ self.async.addCallback(_commit)
+ self.async.addBoth(self._do_close, d)
d.addBoth(_convert_error, request)
return d
d = defer.Deferred()
def _set(ign):
if noisy: self.log("_set(%r) in %r" % (ign, request), level=NOISY)
- if only_if_at and only_if_at != _direntry_for(self.parent, self.childname, self.filenode):
+ current_direntry = _direntry_for(self.parent, self.childname, self.filenode)
+ if only_if_at and only_if_at != current_direntry:
+ if noisy: self.log("not setting attributes: current_direntry=%r in %r" %
+ (current_direntry, request), level=NOISY)
return None
now = time()
self.metadata = update_metadata(self.metadata, _attrs_to_metadata(attrs), now)
if size is not None:
+ # TODO: should we refuse to truncate a file opened with FXF_APPEND?
+ # <http://allmydata.org/trac/tahoe-lafs/ticket/1037#comment:20>
self.consumer.set_current_size(size)
eventually_callback(d)(None)
return None
all_heisenfiles = {}
+def _reload():
+ global all_heisenfiles
+ all_heisenfiles = {}
class SFTPUserHandler(ConchUser, PrefixingLogMixin):
implements(ISFTPServer)
for f in files:
f.abandon()
- def _add_heisenfiles_by_path(self, userpath, files_to_add):
- self.log("._add_heisenfiles_by_path(%r, %r)" % (userpath, files_to_add), level=OPERATIONAL)
+ def _add_heisenfile_by_path(self, file):
+ self.log("._add_heisenfile_by_path(%r)" % (file,), level=OPERATIONAL)
+ userpath = file.get_userpath()
if userpath in self._heisenfiles:
- self._heisenfiles[userpath] += files_to_add
+ self._heisenfiles[userpath] += [file]
else:
- self._heisenfiles[userpath] = files_to_add
+ self._heisenfiles[userpath] = [file]
- def _add_heisenfiles_by_direntry(self, direntry, files_to_add):
- self.log("._add_heisenfiles_by_direntry(%r, %r)" % (direntry, files_to_add), level=OPERATIONAL)
+ def _add_heisenfile_by_direntry(self, file):
+ self.log("._add_heisenfile_by_direntry(%r)" % (file,), level=OPERATIONAL)
+ direntry = file.get_direntry()
if direntry:
if direntry in all_heisenfiles:
- all_heisenfiles[direntry] += files_to_add
+ all_heisenfiles[direntry] += [file]
else:
- all_heisenfiles[direntry] = files_to_add
+ all_heisenfiles[direntry] = [file]
def _abandon_any_heisenfiles(self, userpath, direntry):
request = "._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry)
self.log(request, level=OPERATIONAL)
+ precondition(isinstance(userpath, str), userpath=userpath)
+
# First we synchronously mark all heisenfiles matching the userpath or direntry
# as abandoned, and remove them from the two heisenfile dicts. Then we .sync()
# each file that we abandoned.
(from_userpath, from_parent, from_childname, to_userpath, to_parent, to_childname, overwrite))
self.log(request, level=OPERATIONAL)
+ precondition((isinstance(from_userpath, str) and isinstance(from_childname, unicode) and
+ isinstance(to_userpath, str) and isinstance(to_childname, unicode)),
+ from_userpath=from_userpath, from_childname=from_childname, to_userpath=to_userpath, to_childname=to_childname)
+
+ if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
+
# First we synchronously rename all heisenfiles matching the userpath or direntry.
# Then we .sync() each file that we renamed.
#
from_direntry = _direntry_for(from_parent, from_childname)
to_direntry = _direntry_for(to_parent, to_childname)
+ if noisy: self.log("from_direntry = %r, to_direntry = %r, len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
+ (from_direntry, to_direntry, len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
+
if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles):
def _existing(): raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
+ if noisy: self.log("existing", level=NOISY)
return defer.execute(_existing)
from_files = []
if noisy: self.log("from_files = %r in %r" % (from_files, request), level=NOISY)
- self._add_heisenfiles_by_direntry(to_direntry, from_files)
- self._add_heisenfiles_by_path(to_userpath, from_files)
-
for f in from_files:
f.rename(to_userpath, to_parent, to_childname)
+ self._add_heisenfile_by_path(f)
+ self._add_heisenfile_by_direntry(f)
d = defer.succeed(None)
for f in from_files:
d.addBoth(f.sync)
def _done(ign):
- self.log("done %r" % (request,), level=OPERATIONAL)
+ if noisy: self.log("done: len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
+ (len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
return len(from_files) > 0
d.addBoth(_done)
return d
request = "._update_attrs_for_heisenfiles(%r, %r, %r)" % (userpath, direntry, attrs)
self.log(request, level=OPERATIONAL)
+ _assert(isinstance(userpath, str) and isinstance(direntry, str),
+ userpath=userpath, direntry=direntry)
+
files = []
if direntry in all_heisenfiles:
files = all_heisenfiles[direntry]
def _done(ign):
self.log("done %r" % (request,), level=OPERATIONAL)
+ # TODO: this should not return True if only_if_at caused all files to be skipped.
return len(files) > 0
d.addBoth(_done)
return d
request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore)
self.log(request, level=OPERATIONAL)
+ _assert(isinstance(userpath, str) and isinstance(direntry, (str, NoneType)),
+ userpath=userpath, direntry=direntry)
+
files = []
if direntry in all_heisenfiles:
files = all_heisenfiles[direntry]
def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):
if noisy: self.log("._remove_heisenfile(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY)
+ _assert(isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)),
+ userpath=userpath, childname=childname)
+
direntry = _direntry_for(parent, childname)
if direntry in all_heisenfiles:
all_old_files = all_heisenfiles[direntry]
else:
del self._heisenfiles[userpath]
+ if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
+
def _make_file(self, existing_file, userpath, flags, parent=None, childname=None, filenode=None, metadata=None):
if noisy: self.log("._make_file(%r, %r, %r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
(existing_file, userpath, flags, _repr_flags(flags), parent, childname, filenode, metadata),
level=NOISY)
- assert metadata is None or 'no-write' in metadata, metadata
+ _assert((isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)) and
+ (metadata is None or 'no-write' in metadata)),
+ userpath=userpath, childname=childname, metadata=metadata)
writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
direntry = _direntry_for(parent, childname, filenode)
def _got_file(file):
file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata)
if writing:
- self._add_heisenfiles_by_direntry(direntry, [file])
+ self._add_heisenfile_by_direntry(file)
return file
d.addCallback(_got_file)
return d
- def openFile(self, pathstring, flags, attrs):
- request = ".openFile(%r, %r = %r, %r)" % (pathstring, flags, _repr_flags(flags), attrs)
+ def openFile(self, pathstring, flags, attrs, delay=None):
+ request = ".openFile(%r, %r = %r, %r, delay=%r)" % (pathstring, flags, _repr_flags(flags), attrs, delay)
self.log(request, level=OPERATIONAL)
# This is used for both reading and writing.
if flags & (FXF_WRITE | FXF_CREAT):
file = GeneralSFTPFile(userpath, flags, self._remove_heisenfile, self._convergence)
- self._add_heisenfiles_by_path(userpath, [file])
+ self._add_heisenfile_by_path(file)
else:
# We haven't decided which file implementation to use yet.
file = None
# Note that the permission checks below are for more precise error reporting on
# the open call; later operations would fail even if we did not make these checks.
- d = self._get_root(path)
+ d = delay or defer.succeed(None)
+ d.addCallback(lambda ign: self._get_root(path))
def _got_root( (root, path) ):
if root.is_unknown():
raise SFTPError(FX_PERMISSION_DENIED,
if (flags & FXF_WRITE) and root.is_readonly():
raise SFTPError(FX_PERMISSION_DENIED,
"cannot write to a non-writeable filecap without a parent directory")
- if (flags & FXF_WRITE) and root.is_mutable() and desired_metadata.get('no-write', False):
- raise SFTPError(FX_PERMISSION_DENIED,
- "cannot write to a mutable filecap without a parent directory, when the "
- "specified permissions would require the link from the parent to be made read-only")
if flags & FXF_EXCL:
raise SFTPError(FX_FAILURE,
"cannot create a file exclusively when it already exists")
# reported as r--r--r--, which is appropriate because an immutable file can't be
# written via this path.
- metadata['no-write'] = _no_write(True, root, metadata)
+ metadata['no-write'] = _no_write(True, root)
return self._make_file(file, userpath, flags, filenode=root, metadata=metadata)
else:
# case 2
if noisy: self.log("_got_child( (%r, %r) )" % (filenode, current_metadata), level=NOISY)
metadata = update_metadata(current_metadata, desired_metadata, time())
- metadata['no-write'] = _no_write(parent_readonly, filenode, metadata)
+
+ # Ignore the permissions of the desired_metadata in an open call. The permissions
+ # can only be set by setAttrs.
+ metadata['no-write'] = _no_write(parent_readonly, filenode, current_metadata)
if filenode.is_unknown():
raise SFTPError(FX_PERMISSION_DENIED,
# For the standard SSH_FXP_RENAME operation, overwrite=False.
# We also support the posix-rename@openssh.com extension, which uses overwrite=True.
- d2 = defer.fail(NoSuchChildError())
+ d2 = defer.succeed(None)
if not overwrite:
d2.addCallback(lambda ign: to_parent.get(to_childname))
- def _expect_fail(res):
- if not isinstance(res, Failure):
- raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
+ def _expect_fail(res):
+ if not isinstance(res, Failure):
+ raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
- # It is OK if we fail for errors other than NoSuchChildError, since that probably
- # indicates some problem accessing the destination directory.
- res.trap(NoSuchChildError)
- d2.addBoth(_expect_fail)
+ # It is OK if we fail for errors other than NoSuchChildError, since that probably
+ # indicates some problem accessing the destination directory.
+ res.trap(NoSuchChildError)
+ d2.addBoth(_expect_fail)
# If there are heisenfiles to be written at the 'from' direntry, then ensure
# they will now be written at the 'to' direntry instead.
d2.addCallback(lambda ign: parent.get_child_and_metadata_at_path([childname]))
def _got( (child, metadata) ):
if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY)
- assert IDirectoryNode.providedBy(parent), parent
+ _assert(IDirectoryNode.providedBy(parent), parent=parent)
metadata['no-write'] = _no_write(parent.is_readonly(), child, metadata)
d3 = child.get_current_size()
d3.addCallback(lambda size: _populate_attrs(child, metadata, size=size))
desired_metadata = _attrs_to_metadata(attrs)
if noisy: self.log("desired_metadata = %r" % (desired_metadata,), level=NOISY)
- return parent_or_node.set_metadata_for(childname, desired_metadata)
+ d3 = parent_or_node.set_metadata_for(childname, desired_metadata)
+ def _nosuch(err):
+ if updated_heisenfiles:
+ err.trap(NoSuchChildError)
+ else:
+ return err
+ d3.addErrback(_nosuch)
+ return d3
d2.addCallback(_update)
d2.addCallback(lambda ign: None)
return d2
if extensionName == 'posix-rename@openssh.com':
def _bad(): raise SFTPError(FX_BAD_MESSAGE, "could not parse posix-rename@openssh.com request")
+ if 4 > len(extensionData): return defer.execute(_bad)
(fromPathLen,) = struct.unpack('>L', extensionData[0:4])
if 8 + fromPathLen > len(extensionData): return defer.execute(_bad)
def _path_from_string(self, pathstring):
if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
+ _assert(isinstance(pathstring, str), pathstring=pathstring)
+
# The home directory is the root directory.
pathstring = pathstring.strip("/")
if pathstring == "" or pathstring == ".":
return d
-class SFTPUser(ConchUser, PrefixingLogMixin):
- implements(ISession)
- def __init__(self, check_abort, client, rootnode, username, convergence):
- ConchUser.__init__(self)
- PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
+class FakeTransport:
+ implements(ITransport)
+ def write(self, data):
+ logmsg("FakeTransport.write(<data of length %r>)" % (len(data),), level=NOISY)
- self.channelLookup["session"] = session.SSHSession
- self.subsystemLookup["sftp"] = FileTransferServer
+ def writeSequence(self, data):
+ logmsg("FakeTransport.writeSequence(...)", level=NOISY)
+
+ def loseConnection(self):
+ logmsg("FakeTransport.loseConnection()", level=NOISY)
+
+ # getPeer and getHost can just raise errors, since we don't know what to return
- self.check_abort = check_abort
- self.client = client
- self.root = rootnode
- self.username = username
- self.convergence = convergence
+
+class ShellSession(PrefixingLogMixin):
+ implements(ISession)
+ def __init__(self, userHandler):
+ PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
+ if noisy: self.log(".__init__(%r)" % (userHandler), level=NOISY)
def getPty(self, terminal, windowSize, attrs):
self.log(".getPty(%r, %r, %r)" % (terminal, windowSize, attrs), level=OPERATIONAL)
- raise NotImplementedError
def openShell(self, protocol):
self.log(".openShell(%r)" % (protocol,), level=OPERATIONAL)
- raise NotImplementedError
+ if hasattr(protocol, 'transport') and protocol.transport is None:
+ protocol.transport = FakeTransport() # work around Twisted bug
+
+ return self._unsupported(protocol)
def execCommand(self, protocol, cmd):
self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
- raise NotImplementedError
+ if hasattr(protocol, 'transport') and protocol.transport is None:
+ protocol.transport = FakeTransport() # work around Twisted bug
+
+ d = defer.succeed(None)
+ if cmd == "df -P -k /":
+ d.addCallback(lambda ign: protocol.write(
+ "Filesystem 1024-blocks Used Available Capacity Mounted on\r\n"
+ "tahoe 628318530 314159265 314159265 50% /\r\n"))
+ d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessDone(None))))
+ else:
+ d.addCallback(lambda ign: self._unsupported(protocol))
+ return d
+
+ def _unsupported(self, protocol):
+ d = defer.succeed(None)
+ d.addCallback(lambda ign: protocol.errReceived(
+ "This server supports only the SFTP protocol. It does not support SCP,\r\n"
+ "interactive shell sessions, or commands other than one needed by sshfs.\r\n"))
+ d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
+ return d
def windowChanged(self, newWindowSize):
self.log(".windowChanged(%r)" % (newWindowSize,), level=OPERATIONAL)
- def eofReceived():
+ def eofReceived(self):
self.log(".eofReceived()", level=OPERATIONAL)
def closed(self):
self.log(".closed()", level=OPERATIONAL)
-# if you have an SFTPUser, and you want something that provides ISFTPServer,
-# then you get SFTPHandler(user)
-components.registerAdapter(SFTPHandler, SFTPUser, ISFTPServer)
+# If you have an SFTPUserHandler and want something that provides ISession, you get
+# ShellSession(userHandler).
+# We use adaptation because this must be a different object to the SFTPUserHandler.
+components.registerAdapter(ShellSession, SFTPUserHandler, ISession)
+
-from auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
+from allmydata.frontends.auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
class Dispatcher:
implements(portal.IRealm)
self._client = client
def requestAvatar(self, avatarID, mind, interface):
- assert interface == IConchUser, interface
+ _assert(interface == IConchUser, interface=interface)
rootnode = self._client.create_node_from_uri(avatarID.rootcap)
handler = SFTPUserHandler(self._client, rootnode, avatarID.username)
return (interface, handler, handler.logout)