From: david-sarah Date: Sat, 29 May 2010 04:52:53 +0000 (-0700) Subject: SFTP: cater to clients that assume a file is created as soon as they have made an... X-Git-Tag: trac-4400~1 X-Git-Url: https://git.rkrishnan.org/pf/content/%22file:/frontends/reliability?a=commitdiff_plain;h=e8679855399f6fd6b3f7062da79df745625ddb6c;p=tahoe-lafs%2Ftahoe-lafs.git SFTP: cater to clients that assume a file is created as soon as they have made an open request; also, fix some race conditions associated with closing a file at about the same time as renaming or removing it. --- diff --git a/src/allmydata/frontends/sftpd.py b/src/allmydata/frontends/sftpd.py index 45ff5e00..e0516d81 100644 --- a/src/allmydata/frontends/sftpd.py +++ b/src/allmydata/frontends/sftpd.py @@ -27,7 +27,7 @@ from allmydata.util import deferredutil from allmydata.util.consumer import download_to_data from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \ - NoSuchChildError + NoSuchChildError, ChildOfWrongTypeError from allmydata.mutable.common import NotWriteableError from allmydata.immutable.upload import FileHandle @@ -43,7 +43,7 @@ warnings.filterwarnings("ignore", category=DeprecationWarning, noisy = True use_foolscap_logging = True -from allmydata.util.log import NOISY, OPERATIONAL, \ +from allmydata.util.log import NOISY, OPERATIONAL, WEIRD, \ msg as _msg, err as _err, PrefixingLogMixin as _PrefixingLogMixin if use_foolscap_logging: @@ -54,10 +54,10 @@ else: # pragma: no cover def logerr(s, level=None): print s class PrefixingLogMixin: - def __init__(self, facility=None): - pass + def __init__(self, facility=None, prefix=''): + self.prefix = prefix def log(self, s, level=None): - print s + print "%r %s" % (self.prefix, s) def eventually_callback(d): @@ -105,7 +105,7 @@ def _convert_error(res, request): if err.check(NoSuchChildError): childname = _utf8(err.value.args[0]) raise SFTPError(FX_NO_SUCH_FILE, childname) - if err.check(NotWriteableError): + if err.check(NotWriteableError) or err.check(ChildOfWrongTypeError): msg = _utf8(err.value.args[0]) raise SFTPError(FX_PERMISSION_DENIED, msg) if err.check(ExistingChildError): @@ -160,12 +160,7 @@ def _lsLine(name, attrs): perms = array.array('c', '-'*10) ft = stat.S_IFMT(mode) if stat.S_ISDIR(ft): perms[0] = 'd' - elif stat.S_ISCHR(ft): perms[0] = 'c' - elif stat.S_ISBLK(ft): perms[0] = 'b' elif stat.S_ISREG(ft): perms[0] = '-' - elif stat.S_ISFIFO(ft): perms[0] = 'f' - elif stat.S_ISLNK(ft): perms[0] = 'l' - elif stat.S_ISSOCK(ft): perms[0] = 's' else: perms[0] = '?' # user if mode&stat.S_IRUSR: perms[1] = 'r' @@ -277,7 +272,7 @@ class EncryptedTemporaryFile(PrefixingLogMixin): self.key = os.urandom(16) # AES-128 def _crypt(self, offset, data): - # FIXME: use random-access AES (pycryptopp ticket #18) + # TODO: use random-access AES (pycryptopp ticket #18) offset_big = offset // 16 offset_small = offset % 16 iv = binascii.unhexlify("%032x" % offset_big) @@ -329,7 +324,7 @@ class OverwriteableFileConsumer(PrefixingLogMixin): The temporary file reflects the contents of the file that I represent, except that: - regions that have neither been downloaded nor overwritten, if present, - contain zeroes. + contain garbage. - the temporary file may be shorter than the represented file (it is never longer). The latter's current size is stored in self.current_size. @@ -365,7 +360,11 @@ class OverwriteableFileConsumer(PrefixingLogMixin): (size, self.current_size, self.downloaded), level=NOISY) if size < self.current_size or size < self.downloaded: self.f.truncate(size) + if size > self.current_size: + self.overwrite(self.current_size, "\x00" * (size - self.current_size)) self.current_size = size + + # invariant: self.download_size <= self.current_size if size < self.download_size: self.download_size = size if self.downloaded >= self.download_size: @@ -453,7 +452,7 @@ class OverwriteableFileConsumer(PrefixingLogMixin): def overwrite(self, offset, data): if noisy: self.log(".overwrite(%r, )" % (offset, len(data)), level=NOISY) - if offset > self.download_size and offset > self.current_size: + if offset > self.current_size: # Normally writing at an offset beyond the current end-of-file # would leave a hole that appears filled with zeroes. However, an # EncryptedTemporaryFile doesn't behave like that (if there is a @@ -462,14 +461,17 @@ class OverwriteableFileConsumer(PrefixingLogMixin): # the gap between the current EOF and the offset. self.f.seek(self.current_size) - self.f.write("\x00" * (offset - self.current_size)) + self.f.write("\x00" * (offset - self.current_size)) + start = self.current_size else: self.f.seek(offset) + start = offset + self.f.write(data) end = offset + len(data) self.current_size = max(self.current_size, end) if end > self.downloaded: - heapq.heappush(self.overwrites, (offset, end)) + heapq.heappush(self.overwrites, (start, end)) def read(self, offset, length): """When the data has been read, callback the Deferred that we return with this data. @@ -531,7 +533,10 @@ class OverwriteableFileConsumer(PrefixingLogMixin): def close(self): self.is_closed = True self.finish() - self.f.close() + try: + self.f.close() + except EnvironmentError as e: + self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD) def unregisterProducer(self): if self.producer: @@ -548,9 +553,9 @@ class ShortReadOnlySFTPFile(PrefixingLogMixin): I am used only for short immutable files opened in read-only mode. The file contents are downloaded to memory when I am created.""" - def __init__(self, filenode, metadata): - PrefixingLogMixin.__init__(self, facility="tahoe.sftp") - if noisy: self.log(".__init__(%r, %r)" % (filenode, metadata), level=NOISY) + def __init__(self, userpath, filenode, metadata): + PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath) + if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY) assert IFileNode.providedBy(filenode), filenode self.filenode = filenode @@ -625,69 +630,90 @@ class GeneralSFTPFile(PrefixingLogMixin): storing the file contents. In order to allow write requests to be satisfied immediately, there is effectively a FIFO queue between requests made to this file handle, and requests to my OverwriteableFileConsumer. This queue is - implemented by the callback chain of self.async.""" + implemented by the callback chain of self.async. - def __init__(self, close_notify, flags, convergence, parent=None, childname=None, filenode=None, metadata=None): - PrefixingLogMixin.__init__(self, facility="tahoe.sftp") - if noisy: self.log(".__init__(%r, %r, , parent=%r, childname=%r, filenode=%r, metadata=%r)" % - (close_notify, flags, parent, childname, filenode, metadata), level=NOISY) + When first constructed, I am in an 'unopened' state that causes most + operations to be delayed until 'open' is called.""" - self.close_notify = close_notify + def __init__(self, userpath, flags, close_notify, convergence): + PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath) + if noisy: self.log(".__init__(%r, %r = %r, %r, )" % + (userpath, flags, _repr_flags(flags), close_notify), level=NOISY) + + self.userpath = userpath self.flags = flags + self.close_notify = close_notify self.convergence = convergence - self.parent = parent - self.childname = childname - self.filenode = filenode - self.metadata = metadata - self.async = defer.succeed(None) + self.async = defer.Deferred() # Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created. self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL) self.closed = False - self.added = False - self.removed = False - + self.abandoned = False + self.parent = None + self.childname = None + self.filenode = None + self.metadata = None + # self.consumer should only be relied on in callbacks for self.async, since it might # not be set before then. self.consumer = None - tempfile_maker = EncryptedTemporaryFile - if (flags & FXF_TRUNC) or not filenode: - # We're either truncating or creating the file, so we don't need the old contents. - self.consumer = OverwriteableFileConsumer(0, tempfile_maker) - self.consumer.finish() - else: - assert IFileNode.providedBy(filenode), filenode - - # TODO: use download interface described in #993 when implemented. - if filenode.is_mutable(): - self.async.addCallback(lambda ign: filenode.download_best_version()) - def _downloaded(data): - self.consumer = OverwriteableFileConsumer(len(data), tempfile_maker) - self.consumer.write(data) - self.consumer.finish() - return None - self.async.addCallback(_downloaded) + def open(self, parent=None, childname=None, filenode=None, metadata=None): + self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" % + (parent, childname, filenode, metadata), level=OPERATIONAL) + + # If the file has been renamed, the new (parent, childname) takes precedence. + if self.parent is None: + self.parent = parent + if self.childname is None: + self.childname = childname + self.filenode = filenode + self.metadata = metadata + + if not self.closed: + tempfile_maker = EncryptedTemporaryFile + + if (self.flags & FXF_TRUNC) or not filenode: + # We're either truncating or creating the file, so we don't need the old contents. + self.consumer = OverwriteableFileConsumer(0, tempfile_maker) + self.consumer.finish() else: - download_size = filenode.get_size() - assert download_size is not None, "download_size is None" - self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker) - def _read(ign): - if noisy: self.log("_read immutable", level=NOISY) - filenode.read(self.consumer, 0, None) - self.async.addCallback(_read) + assert IFileNode.providedBy(filenode), filenode + + # TODO: use download interface described in #993 when implemented. + if filenode.is_mutable(): + self.async.addCallback(lambda ign: filenode.download_best_version()) + def _downloaded(data): + self.consumer = OverwriteableFileConsumer(len(data), tempfile_maker) + self.consumer.write(data) + self.consumer.finish() + return None + self.async.addCallback(_downloaded) + else: + download_size = filenode.get_size() + assert download_size is not None, "download_size is None" + self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker) + def _read(ign): + if noisy: self.log("_read immutable", level=NOISY) + filenode.read(self.consumer, 0, None) + self.async.addCallback(_read) + + eventually_callback(self.async)(None) - if noisy: self.log("__init__ done", level=NOISY) + if noisy: self.log("open done", level=NOISY) + return self - def rename(self, new_parent, new_childname): - self.log(".rename(%r, %r)" % (new_parent, new_childname), level=OPERATIONAL) + def rename(self, new_userpath, new_parent, new_childname): + self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL) + self.userpath = new_userpath self.parent = new_parent self.childname = new_childname - def remove(self): - self.log(".remove()", level=OPERATIONAL) + def abandon(self): + self.log(".abandon()", level=OPERATIONAL) - self.removed = True + self.abandoned = True def sync(self): self.log(".sync()", level=OPERATIONAL) @@ -712,7 +738,6 @@ class GeneralSFTPFile(PrefixingLogMixin): def _read(ign): if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY) d2 = self.consumer.read(offset, length) - d2.addErrback(_convert_error, request) d2.addCallbacks(eventually_callback(d), eventually_errback(d)) # It is correct to drop d2 here. return None @@ -768,11 +793,21 @@ class GeneralSFTPFile(PrefixingLogMixin): self.closed = True if not (self.flags & (FXF_WRITE | FXF_CREAT)): - return defer.execute(self.consumer.close) - + def _readonly_close(): + if self.consumer: + self.consumer.close() + return defer.execute(_readonly_close) + + # We must capture the abandoned, parent, and childname variables synchronously + # at the close call. This is needed by the correctness arguments in the comments + # for _abandon_any_heisenfiles and _rename_heisenfiles. + abandoned = self.abandoned + parent = self.parent + childname = self.childname + def _close(ign): d2 = defer.succeed(None) - if self.has_changed and not self.removed: + if self.has_changed and not abandoned: d2.addCallback(lambda ign: self.consumer.when_done()) if self.filenode and self.filenode.is_mutable(): self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL) @@ -780,33 +815,28 @@ class GeneralSFTPFile(PrefixingLogMixin): d2.addCallback(lambda size: self.consumer.read(0, size)) d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents)) else: - self.added = True def _add_file(ign): - self.log("_add_file childname=%r" % (self.childname,), level=OPERATIONAL) + self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL) u = FileHandle(self.consumer.get_file(), self.convergence) - return self.parent.add_file(self.childname, u) + return parent.add_file(childname, u) d2.addCallback(_add_file) - d2.addCallback(lambda ign: self.consumer.close()) + def _committed(res): + if noisy: self.log("_committed(%r)" % (res,), level=NOISY) + + self.consumer.close() + + # We must close_notify before re-firing self.async. + if self.close_notify: + self.close_notify(self.userpath, self.parent, self.childname, self) + return res + d2.addBoth(_committed) return d2 + self.async.addCallback(_close) d = defer.Deferred() self.async.addCallbacks(eventually_callback(d), eventually_errback(d)) - - def _closed(res): - if noisy: self.log("_closed(%r)" % (res,), level=NOISY) - self.close_notify(self.parent, self.childname, self) - - # It is possible for there to be a race between adding the file and removing it. - if self.added and self.removed: - self.log("oops, we added %r but must now remove it" % (self.childname,), level=OPERATIONAL) - d2 = self.parent.delete(self.childname) - d2.addBoth(_convert_error, request) # just for logging - d2.addBoth(lambda ign: res) - return d2 - return res - d.addBoth(_closed) d.addBoth(_convert_error, request) return d @@ -877,18 +907,22 @@ class Reason: self.value = value -# For each immutable file that has been opened with write flags -# (FXF_WRITE and/or FXF_CREAT) and is still open, this maps from -# parent_write_uri + "/" + childname_utf8, to (list_of_ISFTPFile, open_time_utc). +# A "heisenfile" is a file that has been opened with write flags +# (FXF_WRITE and/or FXF_CREAT) and not yet close-notified. +# 'all_heisenfiles' maps from a direntry string to +# (list_of_GeneralSFTPFile, open_time_utc). +# A direntry string is parent_write_uri + "/" + childname_utf8 for +# an immutable file, or file_write_uri for a mutable file. # Updates to this dict are single-threaded. -all_open_files = {} +all_heisenfiles = {} + class SFTPUserHandler(ConchUser, PrefixingLogMixin): implements(ISFTPServer) def __init__(self, client, rootnode, username): ConchUser.__init__(self) - PrefixingLogMixin.__init__(self, facility="tahoe.sftp") + PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=username) if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY) self.channelLookup["session"] = session.SSHSession @@ -898,7 +932,9 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): self._root = rootnode self._username = username self._convergence = client.convergence - self._open_files = {} # files created by this user handler and still open + + # maps from UTF-8 paths for this user, to files written and still open + self._heisenfiles = {} def gotVersion(self, otherVersion, extData): self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL) @@ -910,87 +946,164 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): 'fstatvfs@openssh.com': '2', } - def _add_open_files(self, direntry, files_to_add): - if noisy: self.log("._add_open_files(%r, %r)" % (direntry, files_to_add), level=NOISY) + def logout(self): + self.log(".logout()", level=OPERATIONAL) + + for files in self._heisenfiles.itervalues(): + for f in files: + f.abandon() + + def _add_heisenfiles_by_path(self, userpath, files): + if noisy: self.log("._add_heisenfiles_by_path(%r, %r)" % (userpath, files), level=NOISY) + + if userpath in self._heisenfiles: + self._heisenfiles[userpath] += files + else: + self._heisenfiles[userpath] = files + + def _add_heisenfiles_by_direntry(self, direntry, files_to_add): + if noisy: self.log("._add_heisenfiles_by_direntry(%r, %r)" % (direntry, files_to_add), level=NOISY) if direntry: - if direntry in self._open_files: - self._open_files[direntry] += files_to_add + if direntry in all_heisenfiles: + (old_files, opentime) = all_heisenfiles[direntry] + all_heisenfiles[direntry] = (old_files + files_to_add, opentime) else: - self._open_files[direntry] = files_to_add + all_heisenfiles[direntry] = (files_to_add, time()) - if direntry in all_open_files: - (old_files, opentime) = all_open_files[direntry] - all_open_files[direntry] = (old_files + files_to_add, opentime) - else: - all_open_files[direntry] = (files_to_add, time()) + def _abandon_any_heisenfiles(self, userpath, direntry): + if noisy: self.log("._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry), level=NOISY) - def _remove_any_open_files(self, direntry): - if noisy: self.log("._remove_any_open_files(%r)" % (direntry,), level=NOISY) + # First we synchronously mark all heisenfiles matching the userpath or direntry + # as abandoned, and remove them from the two heisenfile dicts. Then we .sync() + # each file that we abandoned. + # + # For each file, the call to .abandon() occurs: + # * before the file is closed, in which case it will never be committed + # (uploaded+linked or published); or + # * after it is closed but before it has been close_notified, in which case the + # .sync() ensures that it has been committed (successfully or not) before we + # return. + # + # This avoids a race that might otherwise cause the file to be committed after + # the remove operation has completed. + # + # We return a Deferred that fires with True if any files were abandoned (this + # does not mean that they were not committed; it is used to determine whether + # a NoSuchChildError from the attempt to delete the file should be suppressed). - if direntry in self._open_files: - del self._open_files[direntry] + files = [] + if direntry in all_heisenfiles: + (files, opentime) = all_heisenfiles[direntry] + del all_heisenfiles[direntry] + if userpath in self._heisenfiles: + files += self._heisenfiles[userpath] + del self._heisenfiles[userpath] - if direntry in all_open_files: - (files, opentime) = all_open_files[direntry] - for f in files: - f.remove() - del all_open_files[direntry] - return True + for f in files: + f.abandon() - return False + d = defer.succeed(None) + for f in files: + d.addBoth(lambda ign: f.sync()) - def _sync_open_files(self, direntry): - if noisy: self.log("._sync_open_files(%r)" % (direntry,), level=NOISY) + d.addBoth(lambda ign: len(files) > 0) + return d + + def _rename_heisenfiles(self, from_userpath, from_parent, from_childname, + to_userpath, to_parent, to_childname, overwrite=True): + if noisy: self.log("._rename_heisenfiles(%r, %r, %r, %r, %r, %r, overwrite=%r)" % + (from_userpath, from_parent, from_childname, + to_userpath, to_parent, to_childname, overwrite), level=NOISY) + + # First we synchronously rename all heisenfiles matching the userpath or direntry. + # Then we .sync() each file that we renamed. + # + # For each file, the call to .rename occurs: + # * before the file is closed, in which case it will be committed at the + # new direntry; or + # * after it is closed but before it has been close_notified, in which case the + # .sync() ensures that it has been committed (successfully or not) before we + # return. + # + # This avoids a race that might otherwise cause the file to be committed at the + # old name after the rename operation has completed. + # + # Note that if overwrite is False, the caller should already have checked + # whether a real direntry exists at the destination. It is possible that another + # direntry (heisen or real) comes to exist at the destination after that check, + # but in that case it is correct for the rename to succeed (and for the commit + # of the heisenfile at the destination to possibly clobber the other entry, since + # that can happen anyway when we have concurrent write handles to the same direntry). + # + # We return a Deferred that fires with True if any files were renamed (this + # does not mean that they were not committed; it is used to determine whether + # a NoSuchChildError from the rename attempt should be suppressed). If overwrite + # is False and there were already heisenfiles at the destination userpath or + # direntry, we return a Deferred that fails with SFTPError(FX_PERMISSION_DENIED). + + from_direntry = self._direntry_for(from_parent, from_childname) + to_direntry = self._direntry_for(to_parent, to_childname) + + if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles): + def _existing(): raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath) + return defer.execute(_existing) + + from_files = [] + if from_direntry in all_heisenfiles: + (from_files, opentime) = all_heisenfiles[from_direntry] + del all_heisenfiles[from_direntry] + if from_userpath in self._heisenfiles: + from_files += self._heisenfiles[from_userpath] + del self._heisenfiles[from_userpath] + + self._add_heisenfiles_by_direntry(to_direntry, from_files) + self._add_heisenfiles_by_path(to_userpath, from_files) + + for f in from_files: + f.rename(to_userpath, to_parent, to_childname) d = defer.succeed(None) - if direntry in all_open_files: - (files, opentime) = all_open_files[direntry] - for f in files: - d.addCallback(lambda ign: f.sync()) + for f in from_files: + d.addBoth(lambda ign: f.sync()) + d.addBoth(lambda ign: len(from_files) > 0) return d - def _close_notify(self, parent, childname, file_to_remove): - if noisy: self.log("._close_notify(%r, %r, %r)" % (parent, childname, file_to_remove), level=NOISY) + def _sync_heisenfiles(self, userpath, direntry, ignore=None): + if noisy: self.log("._sync_heisenfiles(%r, %r)" % (userpath, direntry), level=NOISY) - direntry = self._direntry_for(parent, childname) - if direntry in self._open_files: - old_files = self._open_files[direntry] - new_files = [f for f in old_files if f is not file_to_remove] - if len(new_files) > 0: - self._open_files[direntry] = new_files - else: - del self._open_files[direntry] + files = [] + if direntry in all_heisenfiles: + (files, opentime) = all_heisenfiles[direntry] + if userpath in self._heisenfiles: + files += self._heisenfiles[userpath] - if direntry in all_open_files: - (all_old_files, opentime) = all_open_files[direntry] + d = defer.succeed(None) + for f in files: + if f is not ignore: + d.addCallback(lambda ign: f.sync()) + return d + + def _remove_heisenfile(self, userpath, parent, childname, file_to_remove): + if noisy: self.log("._remove_file(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY) + + direntry = self._direntry_for(parent, childname) + if direntry in all_heisenfiles: + (all_old_files, opentime) = all_heisenfiles[direntry] all_new_files = [f for f in all_old_files if f is not file_to_remove] if len(all_new_files) > 0: - all_open_files[direntry] = (all_new_files, opentime) + all_heisenfiles[direntry] = (all_new_files, opentime) else: - del all_open_files[direntry] - - def _rename_open_files(self, from_parent, from_childname, to_parent, to_childname): - """When an direntry is renamed, any open files for that direntry are also renamed. - Return True if there were any open files at from_direntry.""" + del all_heisenfiles[direntry] - if noisy: self.log("._rename_open_files(%r, %r, %r, %r)" % - (from_parent, from_childname, to_parent, to_childname), level=NOISY) - - from_direntry = self._direntry_for(from_parent, from_childname) - to_direntry = self._direntry_for(to_parent, to_childname) - - if from_direntry in all_open_files: - (from_files, opentime) = all_open_files[from_direntry] - del self._open_files[from_direntry] - del all_open_files[from_direntry] - for file in from_files: - file.rename(to_parent, to_childname) - self._add_open_files(to_direntry, from_files) - return True - else: - return False + if userpath in self._heisenfiles: + old_files = self._heisenfiles[userpath] + new_files = [f for f in old_files if f is not file_to_remove] + if len(new_files) > 0: + self._heisenfiles[userpath] = new_files + else: + del self._heisenfiles[userpath] def _direntry_for(self, filenode_or_parent, childname=None): if filenode_or_parent: @@ -1002,38 +1115,34 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): return None - def logout(self): - self.log(".logout()", level=OPERATIONAL) - - for (direntry, files_at_direntry) in enumerate(self._open_files): - for f in files_at_direntry: - f.remove() - f.close() - - def _make_file(self, flags, parent=None, childname=None, filenode=None, metadata=None): - if noisy: self.log("._make_file(%r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r" % - (flags, _repr_flags(flags), parent, childname, filenode, metadata), level=NOISY) + def _make_file(self, existing_file, userpath, flags, parent=None, childname=None, filenode=None, metadata=None): + if noisy: self.log("._make_file(%r, %r, %r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" % + (existing_file, userpath, flags, _repr_flags(flags), parent, childname, filenode, metadata), + level=NOISY) assert metadata is None or 'readonly' in metadata, metadata - writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0 + writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0 if childname: direntry = self._direntry_for(parent, childname) else: direntry = self._direntry_for(filenode) - d = self._sync_open_files(direntry) + d = self._sync_heisenfiles(userpath, direntry, ignore=existing_file) if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD: - d.addCallback(lambda ign: ShortReadOnlySFTPFile(filenode, metadata)) + d.addCallback(lambda ign: ShortReadOnlySFTPFile(userpath, filenode, metadata)) else: - d.addCallback(lambda ign: GeneralSFTPFile(self._close_notify, flags, self._convergence, - parent=parent, childname=childname, filenode=filenode, metadata=metadata)) - def _add_to_open(file): + close_notify = None + if writing: + close_notify = self._remove_heisenfile + + d.addCallback(lambda ign: existing_file or GeneralSFTPFile(userpath, flags, close_notify, self._convergence)) + def _got_file(file): if writing: - self._add_open_files(direntry, [file]) - return file - d.addCallback(_add_to_open) + self._add_heisenfiles_by_direntry(direntry, [file]) + return file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata) + d.addCallback(_got_file) return d def openFile(self, pathstring, flags, attrs): @@ -1041,21 +1150,43 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): self.log(request, level=OPERATIONAL) # This is used for both reading and writing. - # First exclude invalid combinations of flags. + # First exclude invalid combinations of flags, and empty paths. if not (flags & (FXF_READ | FXF_WRITE)): - raise SFTPError(FX_BAD_MESSAGE, - "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set") + def _bad_readwrite(): + raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set") + return defer.execute(_bad_readwrite) if (flags & FXF_EXCL) and not (flags & FXF_CREAT): - raise SFTPError(FX_BAD_MESSAGE, - "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT") + def _bad_exclcreat(): + raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT") + return defer.execute(_bad_exclcreat) path = self._path_from_string(pathstring) if not path: - raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty") + def _emptypath(): raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty") + return defer.execute(_emptypath) + + # The combination of flags is potentially valid. - # The combination of flags is potentially valid. Now there are two major cases: + # To work around clients that have race condition bugs, a getAttr, rename, or + # remove request following an 'open' request with FXF_WRITE or FXF_CREAT flags, + # should succeed even if the 'open' request has not yet completed. So we now + # synchronously add a file object into the self._heisenfiles dict, indexed + # by its UTF-8 userpath. (We can't yet add it to the all_heisenfiles dict, + # because we don't yet have a user-independent path for the file.) The file + # object does not know its filenode, parent, or childname at this point. + + userpath = self._path_to_utf8(path) + + if flags & (FXF_WRITE | FXF_CREAT): + file = GeneralSFTPFile(userpath, flags, self._remove_heisenfile, self._convergence) + self._add_heisenfiles_by_path(userpath, [file]) + else: + # We haven't decided which file implementation to use yet. + file = None + + # Now there are two major cases: # # 1. The path is specified as /uri/FILECAP, with no parent directory. # If the FILECAP is mutable and writeable, then we can open it in write-only @@ -1104,10 +1235,10 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): raise SFTPError(FX_FAILURE, "cannot create a file exclusively when it already exists") - # The file does not need to be added to all_open_files, because it is not + # The file does not need to be added to all_heisenfiles, because it is not # associated with a directory entry that needs to be updated. - return self._make_file(flags, filenode=root) + return self._make_file(file, userpath, flags, filenode=root) else: # case 2 childname = path[-1] @@ -1170,7 +1301,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): "cannot open a file for writing when the parent directory is read-only") metadata['readonly'] = _is_readonly(parent_readonly, filenode) - return self._make_file(flags, parent=parent, childname=childname, filenode=filenode, metadata=metadata) + return self._make_file(file, userpath, flags, parent=parent, childname=childname, + filenode=filenode, metadata=metadata) def _no_child(f): if noisy: self.log("_no_child(%r)" % (f,), level=NOISY) f.trap(NoSuchChildError) @@ -1182,7 +1314,7 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): raise SFTPError(FX_PERMISSION_DENIED, "cannot create a file when the parent directory is read-only") - return self._make_file(flags, parent=parent, childname=childname) + return self._make_file(file, userpath, flags, parent=parent, childname=childname) d3.addCallbacks(_got_child, _no_child) return d3 @@ -1190,6 +1322,11 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): return d2 d.addCallback(_got_root) + def _remove_on_error(err): + if file: + self._remove_heisenfile(userpath, None, None, file) + return err + d.addErrback(_remove_on_error) d.addBoth(_convert_error, request) return d @@ -1199,16 +1336,18 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): from_path = self._path_from_string(from_pathstring) to_path = self._path_from_string(to_pathstring) + from_userpath = self._path_to_utf8(from_path) + to_userpath = self._path_to_utf8(to_path) # the target directory must already exist d = deferredutil.gatherResults([self._get_parent_or_node(from_path), self._get_parent_or_node(to_path)]) def _got( (from_pair, to_pair) ): - if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r)" % - (from_pair, to_pair, from_pathstring, to_pathstring), level=NOISY) + if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r, overwrite=%r)" % + (from_pair, to_pair, from_pathstring, to_pathstring, overwrite), level=NOISY) (from_parent, from_childname) = from_pair (to_parent, to_childname) = to_pair - + if from_childname is None: raise SFTPError(FX_NO_SUCH_FILE, "cannot rename a source object specified by URI") if to_childname is None: @@ -1217,33 +1356,47 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): # # "It is an error if there already exists a file with the name specified # by newpath." + # OpenSSH's SFTP server returns FX_PERMISSION_DENIED for this error. + # # For the standard SSH_FXP_RENAME operation, overwrite=False. # We also support the posix-rename@openssh.com extension, which uses overwrite=True. - # FIXME: use move_child_to_path to avoid possible data loss due to #943 - #d2 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite) - - d2 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite) - def _check(err): - if noisy: self.log("_check(%r) in .renameFile(%r, %r)" % - (err, from_pathstring, to_pathstring), level=NOISY) - - if not isinstance(err, Failure) or err.check(NoSuchChildError): - # If there are open files to be written at the 'from' direntry, then ensure - # they will now be written at the 'to' direntry instead. - if noisy: self.log("checking open files:\nself._open_files = %r\nall_open_files = %r" % - (self._open_files, all_open_files), level=NOISY) - if self._rename_open_files(from_parent, from_childname, to_parent, to_childname): - # suppress the NoSuchChildError if any open files were renamed - if noisy: self.log("after renaming:\nself._open_files = %r\nall_open_files = %r" % - (self._open_files, all_open_files), level=NOISY) + d2 = defer.fail(NoSuchChildError()) + if not overwrite: + d2.addCallback(lambda ign: to_parent.get(to_childname)) + def _expect_fail(res): + if not isinstance(res, Failure): + raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath) + + # It is OK if we fail for errors other than NoSuchChildError, since that probably + # indicates some problem accessing the destination directory. + res.trap(NoSuchChildError) + d2.addBoth(_expect_fail) + + # If there are heisenfiles to be written at the 'from' direntry, then ensure + # they will now be written at the 'to' direntry instead. + d2.addCallback(lambda ign: + self._rename_heisenfiles(from_userpath, from_parent, from_childname, + to_userpath, to_parent, to_childname, overwrite=overwrite)) + + def _move(renamed): + # FIXME: use move_child_to_path to avoid possible data loss due to #943 + #d3 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite) + + d3 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite) + def _check(err): + if noisy: self.log("_check(%r) in .renameFile(%r, %r, overwrite=%r)" % + (err, from_pathstring, to_pathstring, overwrite), level=NOISY) + + if not isinstance(err, Failure) or (renamed and err.check(NoSuchChildError)): return None - elif err.check(ExistingChildError): - # OpenSSH SFTP server returns FX_PERMISSION_DENIED - raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_pathstring) + if not overwrite and err.check(ExistingChildError): + raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath) - return err - d2.addBoth(_check) + return err + d3.addBoth(_check) + return d3 + d2.addCallback(_move) return d2 d.addCallback(_got) d.addBoth(_convert_error, request) @@ -1297,35 +1450,20 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): return d def _remove_object(self, path, must_be_directory=False, must_be_file=False): - d = defer.maybeDeferred(self._get_parent_or_node, path) + userpath = self._path_to_utf8(path) + d = self._get_parent_or_node(path) def _got_parent( (parent, childname) ): - # FIXME (minor): there is a race condition between the 'get' and 'delete', - # so it is possible that the must_be_directory or must_be_file restrictions - # might not be enforced correctly if the type has just changed. - if childname is None: raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI") direntry = self._direntry_for(parent, childname) - removed = self._remove_any_open_files(direntry) - - d2 = parent.get(childname) - def _got_child(child): - # Unknown children can be removed by either removeFile or removeDirectory. - if must_be_directory and IFileNode.providedBy(child): - raise SFTPError(FX_PERMISSION_DENIED, "rmdir called on a file") - if must_be_file and IDirectoryNode.providedBy(child): - raise SFTPError(FX_PERMISSION_DENIED, "rmfile called on a directory") - return parent.delete(childname) - def _no_child(err): - if removed and err.check(NoSuchChildError): - # suppress NoSuchChildError if an open file was removed - if noisy: self.log("suppressing NoSuchChildError for %r because it was removed as an open file" % - (direntry,), level=NOISY) - return None - else: - return err - d2.addCallbacks(_got_child, _no_child) + d2 = defer.succeed(False) + if not must_be_directory: + d2.addCallback(lambda ign: self._abandon_any_heisenfiles(userpath, direntry)) + + d2.addCallback(lambda abandoned: + parent.delete(childname, must_exist=not abandoned, + must_be_directory=must_be_directory, must_be_file=must_be_file)) return d2 d.addCallback(_got_parent) return d @@ -1380,24 +1518,28 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): # reported as the update time of the best version. But that # information isn't currently stored in mutable shares, I think. - # TODO: some clients will incorrectly try to get the attributes + # Some clients will incorrectly try to get the attributes # of a file immediately after opening it, before it has been put - # into the all_open_files table. This is a race condition bug in + # into the all_heisenfiles table. This is a race condition bug in # the client, but we probably need to handle it anyway. path = self._path_from_string(pathstring) + userpath = self._path_to_utf8(path) d = self._get_parent_or_node(path) def _got_parent_or_node( (parent_or_node, childname) ): if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY) + + direntry = self._direntry_for(parent_or_node, childname) + d2 = self._sync_heisenfiles(userpath, direntry) + if childname is None: node = parent_or_node - d2 = node.get_current_size() + d2.addCallback(lambda ign: node.get_current_size()) d2.addCallback(lambda size: _populate_attrs(node, {'readonly': node.is_unknown() or node.is_readonly()}, size=size)) - return d2 else: parent = parent_or_node - d2 = parent.get_child_and_metadata_at_path([childname]) + d2.addCallback(lambda ign: parent.get_child_and_metadata_at_path([childname])) def _got( (child, metadata) ): if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY) assert IDirectoryNode.providedBy(parent), parent @@ -1409,10 +1551,10 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): if noisy: self.log("_nosuch(%r)" % (err,), level=NOISY) err.trap(NoSuchChildError) direntry = self._direntry_for(parent, childname) - if noisy: self.log("checking open files:\nself._open_files = %r\nall_open_files = %r\ndirentry=%r" % - (self._open_files, all_open_files, direntry), level=NOISY) - if direntry in all_open_files: - (files, opentime) = all_open_files[direntry] + if noisy: self.log("checking open files:\nself._heisenfiles = %r\nall_heisenfiles = %r\ndirentry=%r" % + (self._heisenfiles, all_heisenfiles, direntry), level=NOISY) + if direntry in all_heisenfiles: + (files, opentime) = all_heisenfiles[direntry] sftptime = _to_sftp_time(opentime) # A file that has been opened for writing necessarily has permissions rw-rw-rw-. return {'permissions': S_IFREG | 0666, @@ -1424,7 +1566,7 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): } return err d2.addCallbacks(_got, _nosuch) - return d2 + return d2 d.addCallback(_got_parent_or_node) d.addBoth(_convert_error, request) return d @@ -1503,8 +1645,10 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): def realPath(self, pathstring): self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL) - path_utf8 = [p.encode('utf-8') for p in self._path_from_string(pathstring)] - return "/" + "/".join(path_utf8) + return self._path_to_utf8(self._path_from_string(pathstring)) + + def _path_to_utf8(self, path): + return (u"/" + u"/".join(path)).encode('utf-8') def _path_from_string(self, pathstring): if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY) @@ -1537,11 +1681,12 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): def _get_root(self, path): # return Deferred (root, remaining_path) + d = defer.succeed(None) if path and path[0] == u"uri": - d = defer.maybeDeferred(self._client.create_node_from_uri, path[1].encode('utf-8')) + d.addCallback(lambda ign: self._client.create_node_from_uri(path[1].encode('utf-8'))) d.addCallback(lambda root: (root, path[2:])) else: - d = defer.succeed((self._root, path)) + d.addCallback(lambda ign: (self._root, path)) return d def _get_parent_or_node(self, path): diff --git a/src/allmydata/test/test_sftp.py b/src/allmydata/test/test_sftp.py index b3baef2f..b2ed0410 100644 --- a/src/allmydata/test/test_sftp.py +++ b/src/allmydata/test/test_sftp.py @@ -34,7 +34,7 @@ from allmydata.test.common import ShouldFailMixin timeout = 240 class Handler(GridTestMixin, ShouldFailMixin, unittest.TestCase): - """This is a no-network unit test of the SFTPHandler class.""" + """This is a no-network unit test of the SFTPUserHandler and the abstractions it uses.""" if not have_pycrypto: skip = "SFTP support requires pycrypto, which is not installed"