d = defer.Deferred()
def _read(data):
- if noisy: self.log("_read(%r) in readChunk(%r, %r)" % (data, offset, length), level=NOISY)
+ if noisy: self.log("_read(<data of length %r>) in readChunk(%r, %r)" % (len(data), offset, length), level=NOISY)
# "In response to this request, the server will read as many bytes as it
# can from the file (up to 'len'), and return them in a SSH_FXP_DATA
# Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
self.closed = False
+ self.added = False
self.removed = False
# self.consumer should only be relied on in callbacks for self.async, since it might
self.removed = True
+ def sync(self):
+ self.log(".sync()", level=OPERATIONAL)
+
+ d = defer.Deferred()
+ self.async.addBoth(eventually_callback(d))
+ return d
+
def readChunk(self, offset, length):
request = ".readChunk(%r, %r)" % (offset, length)
self.log(request, level=OPERATIONAL)
# Note that we return without waiting for the write to occur. Reads and
# close wait for prior writes, and will fail if any prior operation failed.
- # This is ok because SFTP makes no guarantee that the request completes
- # before the write. In fact it explicitly allows write errors to be delayed
- # until close:
+ # This is ok because SFTP makes no guarantee that the write completes
+ # before the request does. In fact it explicitly allows write errors to be
+ # delayed until close:
# "One should note that on some server platforms even a close can fail.
# This can happen e.g. if the server operating system caches writes,
# and an error occurs while flushing cached writes during the close."
if self.has_changed and not self.removed:
d2.addCallback(lambda ign: self.consumer.when_done())
if self.filenode and self.filenode.is_mutable():
+ self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL)
d2.addCallback(lambda ign: self.consumer.get_current_size())
d2.addCallback(lambda size: self.consumer.read(0, size))
d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
else:
+ self.added = True
def _add_file(ign):
self.log("_add_file childname=%r" % (self.childname,), level=OPERATIONAL)
u = FileHandle(self.consumer.get_file(), self.convergence)
self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
def _closed(res):
+ if noisy: self.log("_closed(%r)" % (res,), level=NOISY)
self.close_notify(self.parent, self.childname, self)
# It is possible for there to be a race between adding the file and removing it.
- if self.removed:
+ if self.added and self.removed:
self.log("oops, we added %r but must now remove it" % (self.childname,), level=OPERATIONAL)
d2 = self.parent.delete(self.childname)
+ d2.addBoth(_convert_error, request) # just for logging
d2.addBoth(lambda ign: res)
return d2
return res
}
def _add_open_files(self, direntry, files_to_add):
+ if noisy: self.log("._add_open_files(%r, %r)" % (direntry, files_to_add), level=NOISY)
+
if direntry:
if direntry in self._open_files:
self._open_files[direntry] += files_to_add
all_open_files[direntry] = (files_to_add, time())
def _remove_any_open_files(self, direntry):
- removed = False
+ if noisy: self.log("._remove_any_open_files(%r)" % (direntry,), level=NOISY)
+
if direntry in self._open_files:
- for f in self._open_files[direntry]:
- f.remove()
del self._open_files[direntry]
- removed = True
if direntry in all_open_files:
(files, opentime) = all_open_files[direntry]
for f in files:
f.remove()
del all_open_files[direntry]
- removed = True
+ return True
+
+ return False
+
+ def _sync_open_files(self, direntry):
+ if noisy: self.log("._sync_open_files(%r)" % (direntry,), level=NOISY)
- return removed
+ d = defer.succeed(None)
+ if direntry in all_open_files:
+ (files, opentime) = all_open_files[direntry]
+ for f in files:
+ d.addCallback(lambda ign: f.sync())
+
+ return d
def _close_notify(self, parent, childname, file_to_remove):
+ if noisy: self.log("._close_notify(%r, %r, %r)" % (parent, childname, file_to_remove), level=NOISY)
+
direntry = self._direntry_for(parent, childname)
if direntry in self._open_files:
old_files = self._open_files[direntry]
"""When an direntry is renamed, any open files for that direntry are also renamed.
Return True if there were any open files at from_direntry."""
+ if noisy: self.log("._rename_open_files(%r, %r, %r, %r)" %
+ (from_parent, from_childname, to_parent, to_childname), level=NOISY)
+
from_direntry = self._direntry_for(from_parent, from_childname)
to_direntry = self._direntry_for(to_parent, to_childname)
else:
return False
- def _direntry_for(self, parent, childname):
- if parent and childname:
- rw_uri = parent.get_write_uri()
- if rw_uri:
+ def _direntry_for(self, filenode_or_parent, childname=None):
+ if filenode_or_parent:
+ rw_uri = filenode_or_parent.get_write_uri()
+ if rw_uri and childname:
return rw_uri + "/" + childname.encode('utf-8')
+ else:
+ return rw_uri
return None
assert metadata is None or 'readonly' in metadata, metadata
writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
- if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
- return ShortReadOnlySFTPFile(filenode, metadata)
+ if childname:
+ direntry = self._direntry_for(parent, childname)
else:
- direntry = None
- if writing:
- direntry = self._direntry_for(parent, childname)
+ direntry = self._direntry_for(filenode)
- file = GeneralSFTPFile(self._close_notify, flags, self._convergence,
- parent=parent, childname=childname, filenode=filenode, metadata=metadata)
- self._add_open_files(direntry, [file])
- return file
+ d = self._sync_open_files(direntry)
+
+ if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
+ d.addCallback(lambda ign: ShortReadOnlySFTPFile(filenode, metadata))
+ else:
+ d.addCallback(lambda ign: GeneralSFTPFile(self._close_notify, flags, self._convergence,
+ parent=parent, childname=childname, filenode=filenode, metadata=metadata))
+ def _add_to_open(file):
+ if writing:
+ self._add_open_files(direntry, [file])
+ return file
+ d.addCallback(_add_to_open)
+ return d
def openFile(self, pathstring, flags, attrs):
request = ".openFile(%r, %r = %r, %r)" % (pathstring, flags, _repr_flags(flags), attrs)
# This is used for both reading and writing.
# First exclude invalid combinations of flags.
- # /usr/bin/sftp 'get' gives us FXF_READ, while 'put' on a new file
- # gives FXF_WRITE | FXF_CREAT | FXF_TRUNC. I'm guessing that 'put' on an
- # existing file gives the same.
-
if not (flags & (FXF_READ | FXF_WRITE)):
raise SFTPError(FX_BAD_MESSAGE,
"invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
# d. the child is immutable: if we are trying to open it write-only or
# read/write, then we must be able to write to the parent directory.
#
- # To reduce latency, open succeeds as soon as these conditions are met, even
- # though there might be a failure in downloading the existing file or uploading
- # a new one.
+ # To reduce latency, open normally succeeds as soon as these conditions are
+ # met, even though there might be a failure in downloading the existing file
+ # or uploading a new one. However, there is an exception: if a file has been
+ # written, then closed, and is now being reopened, then we have to delay the
+ # open until the previous upload/publish has completed. This is necessary
+ # because sshfs does not wait for the result of an FXF_CLOSE message before
+ # reporting to the client that a file has been closed. It applies both to
+ # mutable files, and to directory entries linked to an immutable file.
#
# Note that the permission checks below are for more precise error reporting on
# the open call; later operations would fail even if we did not make these checks.
if childname is None:
raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
- removed = self._remove_any_open_files(self._direntry_for(parent, childname))
+ direntry = self._direntry_for(parent, childname)
+ removed = self._remove_any_open_files(direntry)
d2 = parent.get(childname)
def _got_child(child):
def _no_child(err):
if removed and err.check(NoSuchChildError):
# suppress NoSuchChildError if an open file was removed
+ if noisy: self.log("suppressing NoSuchChildError for %r because it was removed as an open file" %
+ (direntry,), level=NOISY)
return None
else:
return err
# reported as the update time of the best version. But that
# information isn't currently stored in mutable shares, I think.
+ # TODO: some clients will incorrectly try to get the attributes
+ # of a file immediately after opening it, before it has been put
+ # into the all_open_files table. This is a race condition bug in
+ # the client, but we probably need to handle it anyway.
+
path = self._path_from_string(pathstring)
d = self._get_parent_or_node(path)
def _got_parent_or_node( (parent_or_node, childname) ):