From: Daira Hopwood Date: Tue, 28 Apr 2015 18:45:12 +0000 (+0100) Subject: WIP X-Git-Url: https://git.rkrishnan.org/vdrive/nxhtml.html?a=commitdiff_plain;h=6ad7352809a04aa3f7e3261d9327e301e98faf43;p=tahoe-lafs%2Ftahoe-lafs.git WIP Signed-off-by: Daira Hopwood --- diff --git a/src/allmydata/frontends/drop_upload.py b/src/allmydata/frontends/drop_upload.py index 90607863..78fee45e 100644 --- a/src/allmydata/frontends/drop_upload.py +++ b/src/allmydata/frontends/drop_upload.py @@ -89,6 +89,8 @@ class DropUploader(service.MultiService): return not r.was_uploaded() def _scan(self, localpath): + if not os.path.isdir(localpath): + raise AssertionError("Programmer error: _scan() must be passed a directory path.") quoted_path = quote_local_unicode_path(localpath) try: children = listdir_unicode(localpath) @@ -137,6 +139,8 @@ class DropUploader(service.MultiService): def Resume(self): self.is_upload_ready = True + # XXX + self._turn_deque() def upload_ready(self): """upload_ready is used to signal us to start @@ -163,8 +167,8 @@ class DropUploader(service.MultiService): def _notify(self, opaque, path, events_mask): self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask)))) self._stats_provider.count('drop_upload.objects_queued', 1) - if path not in self._pending: - self._append_to_deque(path) + if path.path not in self._pending: + self._append_to_deque(path.path) def _process(self, path): d = defer.succeed(None) @@ -173,40 +177,40 @@ class DropUploader(service.MultiService): # directory entry, but we should probably modify the file (as the SFTP frontend does). def _add_file(ignore): self._pending.remove(path) - name = path.basename() + name = os.path.basename(path) # on Windows the name is already Unicode if sys.platform != "win32": name = name.decode(get_filesystem_encoding()) - u = FileName(path.path, self._convergence) + u = FileName(path, self._convergence) return self._parent.add_file(name, u) def _add_dir(ignore): - print "_add_dir %s" % (path.path,) + print "_add_dir %s" % (path,) self._pending.remove(path) - name = path.basename() - dirname = path.path + name = os.path.basename(path) + dirname = path # on Windows the name is already Unicode if sys.platform != "win32": name = name.decode(get_filesystem_encoding()) - dirname = path.path.decode(get_filesystem_encoding()) + dirname = path.decode(get_filesystem_encoding()) - self._scan(dirname) + reactor.callLater(0, self._scan, dirname) return self._parent.create_subdirectory(name) def _maybe_upload(val): - if not os.path.exists(path.path): + if not os.path.exists(path): self._log("uploader: not uploading non-existent file.") self._stats_provider.count('drop_upload.objects_disappeared', 1) return NoSuchChildError("not uploading non-existent file") - elif os.path.islink(path.path): + elif os.path.islink(path): self._log("operator ERROR: symlink not being processed.") return failure.Failure() - if os.path.isdir(path.path): + if os.path.isdir(path): d.addCallback(_add_dir) self._stats_provider.count('drop_upload.directories_created', 1) return None - elif os.path.isfile(path.path): + elif os.path.isfile(path): d.addCallback(_add_file) self._stats_provider.count('drop_upload.files_uploaded', 1) return None @@ -222,13 +226,13 @@ class DropUploader(service.MultiService): def _failed(f): self._stats_provider.count('drop_upload.objects_queued', -1) - if os.path.exists(path.path): - self._log("drop-upload: %r failed to upload due to %r" % (path.path, f)) + if os.path.exists(path): + self._log("drop-upload: %r failed to upload due to %r" % (path, f)) self._stats_provider.count('drop_upload.objects_failed', 1) return f else: self._log("drop-upload: notified object %r disappeared " - "(this is normal for temporary objects): %r" % (path.path, f)) + "(this is normal for temporary objects): %r" % (path, f)) return None d.addCallbacks(_succeeded, _failed) diff --git a/src/allmydata/test/test_drop_upload.py b/src/allmydata/test/test_drop_upload.py index aebcd9f9..b324f6fd 100644 --- a/src/allmydata/test/test_drop_upload.py +++ b/src/allmydata/test/test_drop_upload.py @@ -1,6 +1,7 @@ import os, sys import shutil +import time from twisted.trial import unittest from twisted.python import filepath, runtime @@ -100,18 +101,17 @@ class DropUploadTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, NonA d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.directories_created'), 1)) def testMoveSmallTree(res): - tree_dir = os.path.join(self.basedir, 'creepy_tree') + tree_name = 'small_tree' + tree_dir = os.path.join(self.basedir, tree_name) os.mkdir(tree_dir) - os.path.join(tree_dir, u"tree_frog") f = open(os.path.join(tree_dir, 'what'), "wb") f.write("meow") f.close() - os.rename(tree_dir, os.path.join(self.local_dir,'creepy_tree')) - d = defer.Deferred() - self.uploader.set_uploaded_callback(d.callback) - return d + os.rename(tree_dir, os.path.join(self.local_dir, tree_name)) + return res + d.addCallback(testMoveSmallTree) - d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'), 2)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'), 3)) d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.directories_created'), 2)) d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'), 1)) d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_queued'), 0)) @@ -131,6 +131,7 @@ class DropUploadTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, NonA d = self.client.create_dirnode() d.addCallback(self._made_upload_dir) d.addCallback(lambda ign: self.uploader.Pause()) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'), 0)) def create_file(val): print "creating file..." myFile = os.path.join(self.local_dir, "what") @@ -139,8 +140,6 @@ class DropUploadTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, NonA f.close() return None d.addCallback(create_file) - d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_queued'), 0)) - def resume_uploader(val): self.uploader.Resume() d = defer.Deferred()