From: david-sarah Date: Tue, 25 May 2010 23:02:57 +0000 (-0700) Subject: SFTP: 'sync' any open files at a direntry before opening any new file at that direntr... X-Git-Tag: trac-4400~2 X-Git-Url: https://git.rkrishnan.org/pf/content/en/service/rgr-080307.php?a=commitdiff_plain;h=0b888f82012e25e4cfb5af5904c222472286a185;p=tahoe-lafs%2Ftahoe-lafs.git SFTP: 'sync' any open files at a direntry before opening any new file at that direntry. This works around the sshfs misbehaviour of returning success to clients immediately on close. --- diff --git a/src/allmydata/frontends/sftpd.py b/src/allmydata/frontends/sftpd.py index b6ccefb9..45ff5e00 100644 --- a/src/allmydata/frontends/sftpd.py +++ b/src/allmydata/frontends/sftpd.py @@ -568,7 +568,7 @@ class ShortReadOnlySFTPFile(PrefixingLogMixin): d = defer.Deferred() def _read(data): - if noisy: self.log("_read(%r) in readChunk(%r, %r)" % (data, offset, length), level=NOISY) + if noisy: self.log("_read() in readChunk(%r, %r)" % (len(data), offset, length), level=NOISY) # "In response to this request, the server will read as many bytes as it # can from the file (up to 'len'), and return them in a SSH_FXP_DATA @@ -643,6 +643,7 @@ class GeneralSFTPFile(PrefixingLogMixin): # Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created. self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL) self.closed = False + self.added = False self.removed = False # self.consumer should only be relied on in callbacks for self.async, since it might @@ -688,6 +689,13 @@ class GeneralSFTPFile(PrefixingLogMixin): self.removed = True + def sync(self): + self.log(".sync()", level=OPERATIONAL) + + d = defer.Deferred() + self.async.addBoth(eventually_callback(d)) + return d + def readChunk(self, offset, length): request = ".readChunk(%r, %r)" % (offset, length) self.log(request, level=OPERATIONAL) @@ -727,9 +735,9 @@ class GeneralSFTPFile(PrefixingLogMixin): # Note that we return without waiting for the write to occur. Reads and # close wait for prior writes, and will fail if any prior operation failed. - # This is ok because SFTP makes no guarantee that the request completes - # before the write. In fact it explicitly allows write errors to be delayed - # until close: + # This is ok because SFTP makes no guarantee that the write completes + # before the request does. In fact it explicitly allows write errors to be + # delayed until close: # "One should note that on some server platforms even a close can fail. # This can happen e.g. if the server operating system caches writes, # and an error occurs while flushing cached writes during the close." @@ -767,10 +775,12 @@ class GeneralSFTPFile(PrefixingLogMixin): if self.has_changed and not self.removed: d2.addCallback(lambda ign: self.consumer.when_done()) if self.filenode and self.filenode.is_mutable(): + self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL) d2.addCallback(lambda ign: self.consumer.get_current_size()) d2.addCallback(lambda size: self.consumer.read(0, size)) d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents)) else: + self.added = True def _add_file(ign): self.log("_add_file childname=%r" % (self.childname,), level=OPERATIONAL) u = FileHandle(self.consumer.get_file(), self.convergence) @@ -785,12 +795,14 @@ class GeneralSFTPFile(PrefixingLogMixin): self.async.addCallbacks(eventually_callback(d), eventually_errback(d)) def _closed(res): + if noisy: self.log("_closed(%r)" % (res,), level=NOISY) self.close_notify(self.parent, self.childname, self) # It is possible for there to be a race between adding the file and removing it. - if self.removed: + if self.added and self.removed: self.log("oops, we added %r but must now remove it" % (self.childname,), level=OPERATIONAL) d2 = self.parent.delete(self.childname) + d2.addBoth(_convert_error, request) # just for logging d2.addBoth(lambda ign: res) return d2 return res @@ -899,6 +911,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): } def _add_open_files(self, direntry, files_to_add): + if noisy: self.log("._add_open_files(%r, %r)" % (direntry, files_to_add), level=NOISY) + if direntry: if direntry in self._open_files: self._open_files[direntry] += files_to_add @@ -912,23 +926,34 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): all_open_files[direntry] = (files_to_add, time()) def _remove_any_open_files(self, direntry): - removed = False + if noisy: self.log("._remove_any_open_files(%r)" % (direntry,), level=NOISY) + if direntry in self._open_files: - for f in self._open_files[direntry]: - f.remove() del self._open_files[direntry] - removed = True if direntry in all_open_files: (files, opentime) = all_open_files[direntry] for f in files: f.remove() del all_open_files[direntry] - removed = True + return True + + return False + + def _sync_open_files(self, direntry): + if noisy: self.log("._sync_open_files(%r)" % (direntry,), level=NOISY) - return removed + d = defer.succeed(None) + if direntry in all_open_files: + (files, opentime) = all_open_files[direntry] + for f in files: + d.addCallback(lambda ign: f.sync()) + + return d def _close_notify(self, parent, childname, file_to_remove): + if noisy: self.log("._close_notify(%r, %r, %r)" % (parent, childname, file_to_remove), level=NOISY) + direntry = self._direntry_for(parent, childname) if direntry in self._open_files: old_files = self._open_files[direntry] @@ -950,6 +975,9 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): """When an direntry is renamed, any open files for that direntry are also renamed. Return True if there were any open files at from_direntry.""" + if noisy: self.log("._rename_open_files(%r, %r, %r, %r)" % + (from_parent, from_childname, to_parent, to_childname), level=NOISY) + from_direntry = self._direntry_for(from_parent, from_childname) to_direntry = self._direntry_for(to_parent, to_childname) @@ -964,11 +992,13 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): else: return False - def _direntry_for(self, parent, childname): - if parent and childname: - rw_uri = parent.get_write_uri() - if rw_uri: + def _direntry_for(self, filenode_or_parent, childname=None): + if filenode_or_parent: + rw_uri = filenode_or_parent.get_write_uri() + if rw_uri and childname: return rw_uri + "/" + childname.encode('utf-8') + else: + return rw_uri return None @@ -987,17 +1017,24 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): assert metadata is None or 'readonly' in metadata, metadata writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0 - if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD: - return ShortReadOnlySFTPFile(filenode, metadata) + if childname: + direntry = self._direntry_for(parent, childname) else: - direntry = None - if writing: - direntry = self._direntry_for(parent, childname) + direntry = self._direntry_for(filenode) - file = GeneralSFTPFile(self._close_notify, flags, self._convergence, - parent=parent, childname=childname, filenode=filenode, metadata=metadata) - self._add_open_files(direntry, [file]) - return file + d = self._sync_open_files(direntry) + + if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD: + d.addCallback(lambda ign: ShortReadOnlySFTPFile(filenode, metadata)) + else: + d.addCallback(lambda ign: GeneralSFTPFile(self._close_notify, flags, self._convergence, + parent=parent, childname=childname, filenode=filenode, metadata=metadata)) + def _add_to_open(file): + if writing: + self._add_open_files(direntry, [file]) + return file + d.addCallback(_add_to_open) + return d def openFile(self, pathstring, flags, attrs): request = ".openFile(%r, %r = %r, %r)" % (pathstring, flags, _repr_flags(flags), attrs) @@ -1006,10 +1043,6 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): # This is used for both reading and writing. # First exclude invalid combinations of flags. - # /usr/bin/sftp 'get' gives us FXF_READ, while 'put' on a new file - # gives FXF_WRITE | FXF_CREAT | FXF_TRUNC. I'm guessing that 'put' on an - # existing file gives the same. - if not (flags & (FXF_READ | FXF_WRITE)): raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set") @@ -1040,9 +1073,14 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): # d. the child is immutable: if we are trying to open it write-only or # read/write, then we must be able to write to the parent directory. # - # To reduce latency, open succeeds as soon as these conditions are met, even - # though there might be a failure in downloading the existing file or uploading - # a new one. + # To reduce latency, open normally succeeds as soon as these conditions are + # met, even though there might be a failure in downloading the existing file + # or uploading a new one. However, there is an exception: if a file has been + # written, then closed, and is now being reopened, then we have to delay the + # open until the previous upload/publish has completed. This is necessary + # because sshfs does not wait for the result of an FXF_CLOSE message before + # reporting to the client that a file has been closed. It applies both to + # mutable files, and to directory entries linked to an immutable file. # # Note that the permission checks below are for more precise error reporting on # the open call; later operations would fail even if we did not make these checks. @@ -1268,7 +1306,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): if childname is None: raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI") - removed = self._remove_any_open_files(self._direntry_for(parent, childname)) + direntry = self._direntry_for(parent, childname) + removed = self._remove_any_open_files(direntry) d2 = parent.get(childname) def _got_child(child): @@ -1281,6 +1320,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): def _no_child(err): if removed and err.check(NoSuchChildError): # suppress NoSuchChildError if an open file was removed + if noisy: self.log("suppressing NoSuchChildError for %r because it was removed as an open file" % + (direntry,), level=NOISY) return None else: return err @@ -1339,6 +1380,11 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): # reported as the update time of the best version. But that # information isn't currently stored in mutable shares, I think. + # TODO: some clients will incorrectly try to get the attributes + # of a file immediately after opening it, before it has been put + # into the all_open_files table. This is a race condition bug in + # the client, but we probably need to handle it anyway. + path = self._path_from_string(pathstring) d = self._get_parent_or_node(path) def _got_parent_or_node( (parent_or_node, childname) ):