From: david-sarah <david-sarah@jacaranda.org>
Date: Wed, 9 Jun 2010 08:00:03 +0000 (-0700)
Subject: SFTP: fix most significant memory leak described in #1045 (due to a file being added... 
X-Git-Url: https://git.rkrishnan.org/%5B/COPYING.TGPPL.html?a=commitdiff_plain;h=3c44389440097dff3da6d0aecefb771a49ed7b71;p=tahoe-lafs%2Ftahoe-lafs.git

SFTP: fix most significant memory leak described in #1045 (due to a file being added to all_heisenfiles under more than one direntry when renamed).
---

diff --git a/src/allmydata/frontends/sftpd.py b/src/allmydata/frontends/sftpd.py
index 975b6c4d..10741823 100644
--- a/src/allmydata/frontends/sftpd.py
+++ b/src/allmydata/frontends/sftpd.py
@@ -1,5 +1,6 @@
 
 import os, tempfile, heapq, binascii, traceback, array, stat, struct
+from types import NoneType
 from stat import S_IFREG, S_IFDIR
 from time import time, strftime, localtime
 
@@ -275,6 +276,8 @@ def _attrs_to_metadata(attrs):
 
 
 def _direntry_for(filenode_or_parent, childname, filenode=None):
+    assert isinstance(childname, (unicode, NoneType)), childname
+
     if childname is None:
         filenode_or_parent = filenode
 
@@ -583,7 +586,7 @@ class ShortReadOnlySFTPFile(PrefixingLogMixin):
         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
         if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY)
 
-        assert IFileNode.providedBy(filenode), filenode
+        assert isinstance(userpath, str) and IFileNode.providedBy(filenode), (userpath, filenode)
         self.filenode = filenode
         self.metadata = metadata
         self.async = download_to_data(filenode)
@@ -666,6 +669,7 @@ class GeneralSFTPFile(PrefixingLogMixin):
         if noisy: self.log(".__init__(%r, %r = %r, %r, <convergence censored>)" %
                            (userpath, flags, _repr_flags(flags), close_notify), level=NOISY)
 
+        assert isinstance(userpath, str), userpath
         self.userpath = userpath
         self.flags = flags
         self.close_notify = close_notify
@@ -688,6 +692,7 @@ class GeneralSFTPFile(PrefixingLogMixin):
         self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" %
                  (parent, childname, filenode, metadata), level=OPERATIONAL)
 
+        assert isinstance(childname, (unicode, NoneType)), childname
         # If the file has been renamed, the new (parent, childname) takes precedence.
         if self.parent is None:
             self.parent = parent
@@ -696,7 +701,7 @@ class GeneralSFTPFile(PrefixingLogMixin):
         self.filenode = filenode
         self.metadata = metadata
 
-        assert not self.closed
+        assert not self.closed, self
         tempfile_maker = EncryptedTemporaryFile
 
         if (self.flags & FXF_TRUNC) or not filenode:
@@ -729,9 +734,16 @@ class GeneralSFTPFile(PrefixingLogMixin):
         if noisy: self.log("open done", level=NOISY)
         return self
 
+    def get_userpath(self):
+        return self.userpath
+
+    def get_direntry(self):
+        return _direntry_for(self.parent, self.childname)
+
     def rename(self, new_userpath, new_parent, new_childname):
         self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL)
 
+        assert isinstance(new_userpath, str) and isinstance(new_childname, unicode), (new_userpath, new_childname)
         self.userpath = new_userpath
         self.parent = new_parent
         self.childname = new_childname
@@ -1010,26 +1022,30 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
             for f in files:
                 f.abandon()
 
-    def _add_heisenfiles_by_path(self, userpath, files_to_add):
-        self.log("._add_heisenfiles_by_path(%r, %r)" % (userpath, files_to_add), level=OPERATIONAL)
+    def _add_heisenfile_by_path(self, file):
+        self.log("._add_heisenfile_by_path(%r)" % (file,), level=OPERATIONAL)
 
+        userpath = file.get_userpath()
         if userpath in self._heisenfiles:
-            self._heisenfiles[userpath] += files_to_add
+            self._heisenfiles[userpath] += [file]
         else:
-            self._heisenfiles[userpath] = files_to_add
+            self._heisenfiles[userpath] = [file]
 
-    def _add_heisenfiles_by_direntry(self, direntry, files_to_add):
-        self.log("._add_heisenfiles_by_direntry(%r, %r)" % (direntry, files_to_add), level=OPERATIONAL)
+    def _add_heisenfile_by_direntry(self, file):
+        self.log("._add_heisenfile_by_direntry(%r)" % (file,), level=OPERATIONAL)
 
+        direntry = file.get_direntry()
         if direntry:
             if direntry in all_heisenfiles:
-                all_heisenfiles[direntry] += files_to_add
+                all_heisenfiles[direntry] += [file]
             else:
-                all_heisenfiles[direntry] = files_to_add
+                all_heisenfiles[direntry] = [file]
 
     def _abandon_any_heisenfiles(self, userpath, direntry):
         request = "._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry)
         self.log(request, level=OPERATIONAL)
+        
+        assert isinstance(userpath, str), userpath
 
         # First we synchronously mark all heisenfiles matching the userpath or direntry
         # as abandoned, and remove them from the two heisenfile dicts. Then we .sync()
@@ -1078,6 +1094,12 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
                    (from_userpath, from_parent, from_childname, to_userpath, to_parent, to_childname, overwrite))
         self.log(request, level=OPERATIONAL)
 
+        assert (isinstance(from_userpath, str) and isinstance(from_childname, unicode) and
+                isinstance(to_userpath, str) and isinstance(to_childname, unicode)), \
+               (from_userpath, from_childname, to_userpath, to_childname)
+
+        if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
+
         # First we synchronously rename all heisenfiles matching the userpath or direntry.
         # Then we .sync() each file that we renamed.
         #
@@ -1107,8 +1129,12 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         from_direntry = _direntry_for(from_parent, from_childname)
         to_direntry = _direntry_for(to_parent, to_childname)
 
+        if noisy: self.log("from_direntry = %r, to_direntry = %r in %r" %
+                           (from_direntry, to_direntry, request), level=NOISY)
+
         if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles):
             def _existing(): raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
+            if noisy: self.log("existing", level=NOISY)
             return defer.execute(_existing)
 
         from_files = []
@@ -1121,18 +1147,20 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
 
         if noisy: self.log("from_files = %r in %r" % (from_files, request), level=NOISY)
 
-        self._add_heisenfiles_by_direntry(to_direntry, from_files)
-        self._add_heisenfiles_by_path(to_userpath, from_files)
-
         for f in from_files:
             f.rename(to_userpath, to_parent, to_childname)
+            self._add_heisenfile_by_path(f)
+            self._add_heisenfile_by_direntry(f)
 
         d = defer.succeed(None)
         for f in from_files:
             d.addBoth(f.sync)
 
         def _done(ign):
-            self.log("done %r" % (request,), level=OPERATIONAL)
+            if noisy:
+                self.log("done %r\nall_heisenfiles = %r\nself._heisenfiles = %r" % (request, all_heisenfiles, self._heisenfiles), level=OPERATIONAL)
+            else:
+                self.log("done %r" % (request,), level=OPERATIONAL)
             return len(from_files) > 0
         d.addBoth(_done)
         return d
@@ -1141,6 +1169,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         request = "._update_attrs_for_heisenfiles(%r, %r, %r)" % (userpath, direntry, attrs)
         self.log(request, level=OPERATIONAL)
 
+        assert isinstance(userpath, str) and isinstance(direntry, str), (userpath, direntry)
+
         files = []
         if direntry in all_heisenfiles:
             files = all_heisenfiles[direntry]
@@ -1171,6 +1201,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore)
         self.log(request, level=OPERATIONAL)
 
+        assert isinstance(userpath, str) and isinstance(direntry, (str, NoneType)), (userpath, direntry)
+
         files = []
         if direntry in all_heisenfiles:
             files = all_heisenfiles[direntry]
@@ -1193,6 +1225,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
     def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):
         if noisy: self.log("._remove_heisenfile(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY)
 
+        assert isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)), (userpath, childname)
+
         direntry = _direntry_for(parent, childname)
         if direntry in all_heisenfiles:
             all_old_files = all_heisenfiles[direntry]
@@ -1210,12 +1244,15 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
             else:
                 del self._heisenfiles[userpath]
 
+        if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
+
     def _make_file(self, existing_file, userpath, flags, parent=None, childname=None, filenode=None, metadata=None):
         if noisy: self.log("._make_file(%r, %r, %r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
                            (existing_file, userpath, flags, _repr_flags(flags), parent, childname, filenode, metadata),
                            level=NOISY)
 
-        assert metadata is None or 'no-write' in metadata, metadata
+        assert (isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)) and
+                (metadata is None or 'no-write' in metadata)), (userpath, childname, metadata)
 
         writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
         direntry = _direntry_for(parent, childname, filenode)
@@ -1233,7 +1270,7 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
             def _got_file(file):
                 file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata)
                 if writing:
-                    self._add_heisenfiles_by_direntry(direntry, [file])
+                    self._add_heisenfile_by_direntry(file)
                 return file
             d.addCallback(_got_file)
         return d
@@ -1274,7 +1311,7 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
 
         if flags & (FXF_WRITE | FXF_CREAT):
             file = GeneralSFTPFile(userpath, flags, self._remove_heisenfile, self._convergence)
-            self._add_heisenfiles_by_path(userpath, [file])
+            self._add_heisenfile_by_path(file)
         else:
             # We haven't decided which file implementation to use yet.
             file = None
@@ -1796,6 +1833,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
     def _path_from_string(self, pathstring):
         if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
 
+        assert isinstance(pathstring, str), pathstring
+
         # The home directory is the root directory.
         pathstring = pathstring.strip("/")
         if pathstring == "" or pathstring == ".":
diff --git a/src/allmydata/test/test_sftp.py b/src/allmydata/test/test_sftp.py
index 0bc20e87..66c8abbb 100644
--- a/src/allmydata/test/test_sftp.py
+++ b/src/allmydata/test/test_sftp.py
@@ -49,11 +49,11 @@ class Handler(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, unittest.TestCas
             if isinstance(res, Failure):
                 res.trap(sftp.SFTPError)
                 self.failUnlessReallyEqual(res.value.code, expected_code,
-                                           "%s was supposed to raise SFTPError(%d), not SFTPError(%d): %s" %
+                                           "%s was supposed to raise SFTPError(%r), not SFTPError(%r): %s" %
                                            (which, expected_code, res.value.code, res))
             else:
                 print '@' + '@'.join(s)
-                self.fail("%s was supposed to raise SFTPError(%d), not get %r" %
+                self.fail("%s was supposed to raise SFTPError(%r), not get %r" %
                           (which, expected_code, res))
         d.addBoth(_done)
         return d
@@ -1294,3 +1294,42 @@ class Handler(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, unittest.TestCas
                                          self.handler.extendedRequest, "foo", "bar"))
 
         return d
+
+    def test_memory_leak(self):
+        d0 = self._set_up("memory_leak")
+
+        def _leaky(ign, i):
+            new_i = "new_%r" % (i,)
+            d = defer.succeed(None)
+            # Copied from test_openFile_write above:
+
+            # it should be possible to rename even before the open has completed
+            def _open_and_rename_race(ign):
+                slow_open = defer.Deferred()
+                reactor.callLater(1, slow_open.callback, None)
+                d2 = self.handler.openFile("new", sftp.FXF_WRITE | sftp.FXF_CREAT, {}, delay=slow_open)
+
+                # deliberate race between openFile and renameFile
+                d3 = self.handler.renameFile("new", new_i)
+                del d3
+                return d2
+            d.addCallback(_open_and_rename_race)
+            def _write_rename_race(wf):
+                d2 = wf.writeChunk(0, "abcd")
+                d2.addCallback(lambda ign: wf.close())
+                return d2
+            d.addCallback(_write_rename_race)
+            d.addCallback(lambda ign: self.root.get(unicode(new_i)))
+            d.addCallback(lambda node: download_to_data(node))
+            d.addCallback(lambda data: self.failUnlessReallyEqual(data, "abcd"))
+            d.addCallback(lambda ign:
+                          self.shouldFail(NoSuchChildError, "rename new while open", "new",
+                                          self.root.get, u"new"))
+            return d
+
+        for index in range(3):
+            d0.addCallback(_leaky, index)
+
+        d0.addCallback(lambda ign: self.failUnlessEqual(sftpd.all_heisenfiles, {}))
+        d0.addCallback(lambda ign: self.failUnlessEqual(self.handler._heisenfiles, {}))
+        return d0