-import os, tempfile, heapq, binascii, traceback, array, stat, struct
+import heapq, traceback, array, stat, struct
from types import NoneType
from stat import S_IFREG, S_IFDIR
from time import time, strftime, localtime
from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
NoSuchChildError, ChildOfWrongTypeError
from allmydata.mutable.common import NotWriteableError
+from allmydata.mutable.publish import MutableFileHandle
from allmydata.immutable.upload import FileHandle
from allmydata.dirnode import update_metadata
-
-from pycryptopp.cipher.aes import AES
+from allmydata.util.fileutil import EncryptedTemporaryFile
noisy = True
use_foolscap_logging = True
def _convert_error(res, request):
+ """If res is not a Failure, return it, otherwise reraise the appropriate
+ SFTPError."""
+
if not isinstance(res, Failure):
logged_res = res
if isinstance(res, str): logged_res = "<data of length %r>" % (len(res),)
logmsg("RAISE %r %r" % (request, err.value), level=OPERATIONAL)
try:
if noisy: logmsg(traceback.format_exc(err.value), level=NOISY)
- except: # pragma: no cover
+ except Exception: # pragma: no cover
pass
# The message argument to SFTPError must not reveal information that
- # might compromise anonymity.
+ # might compromise anonymity, if we are running over an anonymous network.
if err.check(SFTPError):
# original raiser of SFTPError has responsibility to ensure anonymity
st_gid = "tahoe"
st_mtime = attrs.get("mtime", 0)
st_mode = attrs["permissions"]
- # TODO: check that clients are okay with this being a "?".
- # (They should be because the longname is intended for human
- # consumption.)
- st_size = attrs.get("size", "?")
+
+ # Some clients won't tolerate '?' in the size field (#1337).
+ st_size = attrs.get("size", 0)
+
# We don't know how many links there really are to this object.
st_nlink = 1
# We would prefer to omit atime, but SFTP version 3 can only
# accept mtime if atime is also set.
if 'linkmotime' in metadata.get('tahoe', {}):
- attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
+ attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
elif 'mtime' in metadata:
- attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['mtime'])
+ attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['mtime'])
if 'linkcrtime' in metadata.get('tahoe', {}):
attrs['createtime'] = _to_sftp_time(metadata['tahoe']['linkcrtime'])
- if 'ctime' in metadata:
- attrs['ctime'] = _to_sftp_time(metadata['ctime'])
-
attrs['permissions'] = perms
# twisted.conch.ssh.filetransfer only implements SFTP version 3,
return None
-class EncryptedTemporaryFile(PrefixingLogMixin):
- # not implemented: next, readline, readlines, xreadlines, writelines
-
- def __init__(self):
- PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
- self.file = tempfile.TemporaryFile()
- self.key = os.urandom(16) # AES-128
-
- def _crypt(self, offset, data):
- # TODO: use random-access AES (pycryptopp ticket #18)
- offset_big = offset // 16
- offset_small = offset % 16
- iv = binascii.unhexlify("%032x" % offset_big)
- cipher = AES(self.key, iv=iv)
- cipher.process("\x00"*offset_small)
- return cipher.process(data)
-
- def close(self):
- self.file.close()
-
- def flush(self):
- self.file.flush()
-
- def seek(self, offset, whence=0): # 0 = SEEK_SET
- if noisy: self.log(".seek(%r, %r)" % (offset, whence), level=NOISY)
- self.file.seek(offset, whence)
-
- def tell(self):
- offset = self.file.tell()
- if noisy: self.log(".tell() = %r" % (offset,), level=NOISY)
- return offset
-
- def read(self, size=-1):
- if noisy: self.log(".read(%r)" % (size,), level=NOISY)
- index = self.file.tell()
- ciphertext = self.file.read(size)
- plaintext = self._crypt(index, ciphertext)
- return plaintext
-
- def write(self, plaintext):
- if noisy: self.log(".write(<data of length %r>)" % (len(plaintext),), level=NOISY)
- index = self.file.tell()
- ciphertext = self._crypt(index, plaintext)
- self.file.write(ciphertext)
-
- def truncate(self, newsize):
- if noisy: self.log(".truncate(%r)" % (newsize,), level=NOISY)
- self.file.truncate(newsize)
-
-
class OverwriteableFileConsumer(PrefixingLogMixin):
implements(IFinishableConsumer)
"""I act both as a consumer for the download of the original file contents, and as a
self.overwrite(self.current_size, "\x00" * (size - self.current_size))
self.current_size = size
- # invariant: self.download_size <= self.current_size
+ # make the invariant self.download_size <= self.current_size be true again
if size < self.download_size:
self.download_size = size
+
if self.downloaded >= self.download_size:
self.finish()
def registerProducer(self, p, streaming):
if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
+ if self.producer is not None:
+ raise RuntimeError("producer is already registered")
+
self.producer = p
if streaming:
# call resumeProducing once to start things off
return
if noisy: self.log("MILESTONE %r %r" % (next, d), level=NOISY)
heapq.heappop(self.milestones)
- eventually_callback(d)(None)
+ eventually(d.callback, None)
if milestone >= self.download_size:
self.finish()
The caller must perform no more overwrites until the Deferred has fired."""
if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
+
+ # Note that the overwrite method is synchronous. When a write request is processed
+ # (e.g. a writeChunk request on the async queue of GeneralSFTPFile), overwrite will
+ # be called and will update self.current_size if necessary before returning. Therefore,
+ # self.current_size will be up-to-date for a subsequent call to this read method, and
+ # so it is correct to do the check for a read past the end-of-file here.
if offset >= self.current_size:
def _eof(): raise EOFError("read past end of file")
return defer.execute(_eof)
return self.done
def finish(self):
+ """Called by the producer when it has finished producing, or when we have
+ received enough bytes, or as a result of a close. Defined by IFinishableConsumer."""
+
while len(self.milestones) > 0:
(next, d) = self.milestones[0]
if noisy: self.log("MILESTONE FINISH %r %r" % (next, d), level=NOISY)
# The callback means that the milestone has been reached if
# it is ever going to be. Note that the file may have been
# truncated to before the milestone.
- eventually_callback(d)(None)
-
- # FIXME: causes spurious failures
- #self.unregisterProducer()
+ eventually(d.callback, None)
def close(self):
if not self.is_closed:
self.is_closed = True
try:
self.f.close()
- except BaseException, e:
+ except Exception, e:
self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
self.finish()
def unregisterProducer(self):
- if self.producer:
- self.producer.stopProducing()
- self.producer = None
+ pass
SIZE_THRESHOLD = 1000
implements(ISFTPFile)
"""I represent a file handle to a particular file on an SFTP connection.
I am used only for short immutable files opened in read-only mode.
- The file contents are downloaded to memory when I am created."""
+ When I am created, the file contents start to be downloaded to memory.
+ self.async is used to delay read requests until the download has finished."""
def __init__(self, userpath, filenode, metadata):
PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
# i.e. we respond with an EOF error iff offset is already at EOF.
if offset >= len(data):
- eventually_errback(d)(SFTPError(FX_EOF, "read at or past end of file"))
+ eventually(d.errback, SFTPError(FX_EOF, "read at or past end of file"))
else:
- eventually_callback(d)(data[offset:min(offset+length, len(data))])
+ eventually(d.callback, data[offset:offset+length]) # truncated if offset+length > len(data)
return data
self.async.addCallbacks(_read, eventually_errback(d))
d.addBoth(_convert_error, request)
else:
assert IFileNode.providedBy(filenode), filenode
- # TODO: use download interface described in #993 when implemented.
- if filenode.is_mutable():
- self.async.addCallback(lambda ign: filenode.download_best_version())
- def _downloaded(data):
- self.consumer = OverwriteableFileConsumer(len(data), tempfile_maker)
- self.consumer.write(data)
- self.consumer.finish()
- return None
- self.async.addCallback(_downloaded)
- else:
- download_size = filenode.get_size()
- assert download_size is not None, "download_size is None"
+ self.async.addCallback(lambda ignored: filenode.get_best_readable_version())
+
+ def _read(version):
+ if noisy: self.log("_read", level=NOISY)
+ download_size = version.get_size()
+ assert download_size is not None
+
self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
- def _read(ign):
- if noisy: self.log("_read immutable", level=NOISY)
- filenode.read(self.consumer, 0, None)
- self.async.addCallback(_read)
- eventually_callback(self.async)(None)
+ version.read(self.consumer, 0, None)
+ self.async.addCallback(_read)
+
+ eventually(self.async.callback, None)
if noisy: self.log("open done", level=NOISY)
return self
d.addBoth(_done)
return d
- def get_metadata(self):
- return self.metadata
-
def readChunk(self, offset, length):
request = ".readChunk(%r, %r)" % (offset, length)
self.log(request, level=OPERATIONAL)
abandoned = self.abandoned
parent = self.parent
childname = self.childname
-
+
# has_changed is set when writeChunk is called, not when the write occurs, so
# it is correct to optimize out the commit if it is False at the close call.
has_changed = self.has_changed
def _close(ign):
d2 = self.consumer.when_done()
if self.filenode and self.filenode.is_mutable():
- self.log("update mutable file %r childname=%r" % (self.filenode, childname), level=OPERATIONAL)
+ self.log("update mutable file %r childname=%r metadata=%r" % (self.filenode, childname, self.metadata), level=OPERATIONAL)
if self.metadata.get('no-write', False) and not self.filenode.is_readonly():
assert parent and childname, (parent, childname, self.metadata)
d2.addCallback(lambda ign: parent.set_metadata_for(childname, self.metadata))
- d2.addCallback(lambda ign: self.consumer.get_current_size())
- d2.addCallback(lambda size: self.consumer.read(0, size))
- d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
+ d2.addCallback(lambda ign: self.filenode.overwrite(MutableFileHandle(self.consumer.get_file())))
else:
def _add_file(ign):
self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
# self.filenode might be None, but that's ok.
attrs = _populate_attrs(self.filenode, self.metadata, size=self.consumer.get_current_size())
- eventually_callback(d)(attrs)
+ eventually(d.callback, attrs)
return None
self.async.addCallbacks(_get, eventually_errback(d))
d.addBoth(_convert_error, request)
d = defer.Deferred()
def _set(ign):
if noisy: self.log("_set(%r) in %r" % (ign, request), level=NOISY)
- if only_if_at and only_if_at != _direntry_for(self.parent, self.childname, self.filenode):
+ current_direntry = _direntry_for(self.parent, self.childname, self.filenode)
+ if only_if_at and only_if_at != current_direntry:
+ if noisy: self.log("not setting attributes: current_direntry=%r in %r" %
+ (current_direntry, request), level=NOISY)
return None
now = time()
# TODO: should we refuse to truncate a file opened with FXF_APPEND?
# <http://allmydata.org/trac/tahoe-lafs/ticket/1037#comment:20>
self.consumer.set_current_size(size)
- eventually_callback(d)(None)
+ eventually(d.callback, None)
return None
self.async.addCallbacks(_set, eventually_errback(d))
d.addBoth(_convert_error, request)
all_heisenfiles = {}
+def _reload():
+ global all_heisenfiles
+ all_heisenfiles = {}
class SFTPUserHandler(ConchUser, PrefixingLogMixin):
implements(ISFTPServer)
def _abandon_any_heisenfiles(self, userpath, direntry):
request = "._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry)
self.log(request, level=OPERATIONAL)
-
+
assert isinstance(userpath, str), userpath
# First we synchronously mark all heisenfiles matching the userpath or direntry
from_direntry = _direntry_for(from_parent, from_childname)
to_direntry = _direntry_for(to_parent, to_childname)
- if noisy: self.log("from_direntry = %r, to_direntry = %r in %r" %
- (from_direntry, to_direntry, request), level=NOISY)
+ if noisy: self.log("from_direntry = %r, to_direntry = %r, len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
+ (from_direntry, to_direntry, len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles):
def _existing(): raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
d.addBoth(f.sync)
def _done(ign):
- if noisy:
- self.log("done %r\nall_heisenfiles = %r\nself._heisenfiles = %r" % (request, all_heisenfiles, self._heisenfiles), level=OPERATIONAL)
- else: # pragma: no cover
- self.log("done %r" % (request,), level=OPERATIONAL)
+ if noisy: self.log("done: len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
+ (len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
return len(from_files) > 0
d.addBoth(_done)
return d
# For the standard SSH_FXP_RENAME operation, overwrite=False.
# We also support the posix-rename@openssh.com extension, which uses overwrite=True.
- d2 = defer.fail(NoSuchChildError())
+ d2 = defer.succeed(None)
if not overwrite:
d2.addCallback(lambda ign: to_parent.get(to_childname))
- def _expect_fail(res):
- if not isinstance(res, Failure):
- raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
+ def _expect_fail(res):
+ if not isinstance(res, Failure):
+ raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
- # It is OK if we fail for errors other than NoSuchChildError, since that probably
- # indicates some problem accessing the destination directory.
- res.trap(NoSuchChildError)
- d2.addBoth(_expect_fail)
+ # It is OK if we fail for errors other than NoSuchChildError, since that probably
+ # indicates some problem accessing the destination directory.
+ res.trap(NoSuchChildError)
+ d2.addBoth(_expect_fail)
# If there are heisenfiles to be written at the 'from' direntry, then ensure
# they will now be written at the 'to' direntry instead.
if extensionName == 'posix-rename@openssh.com':
def _bad(): raise SFTPError(FX_BAD_MESSAGE, "could not parse posix-rename@openssh.com request")
+ if 4 > len(extensionData): return defer.execute(_bad)
(fromPathLen,) = struct.unpack('>L', extensionData[0:4])
if 8 + fromPathLen > len(extensionData): return defer.execute(_bad)
if hasattr(protocol, 'transport') and protocol.transport is None:
protocol.transport = FakeTransport() # work around Twisted bug
- d = defer.succeed(None)
- d.addCallback(lambda ign: protocol.write("This server supports only SFTP, not shell sessions.\n"))
- d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
- return d
+ return self._unsupported(protocol)
def execCommand(self, protocol, cmd):
self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
d = defer.succeed(None)
if cmd == "df -P -k /":
d.addCallback(lambda ign: protocol.write(
- "Filesystem 1024-blocks Used Available Capacity Mounted on\n"
- "tahoe 628318530 314159265 314159265 50% /\n"))
+ "Filesystem 1024-blocks Used Available Capacity Mounted on\r\n"
+ "tahoe 628318530 314159265 314159265 50% /\r\n"))
d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessDone(None))))
else:
- d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
+ d.addCallback(lambda ign: self._unsupported(protocol))
+ return d
+
+ def _unsupported(self, protocol):
+ d = defer.succeed(None)
+ d.addCallback(lambda ign: protocol.errReceived(
+ "This server supports only the SFTP protocol. It does not support SCP,\r\n"
+ "interactive shell sessions, or commands other than one needed by sshfs.\r\n"))
+ d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
return d
def windowChanged(self, newWindowSize):