return not r.was_uploaded()
def _scan(self, localpath):
+ if not os.path.isdir(localpath):
+ raise AssertionError("Programmer error: _scan() must be passed a directory path.")
quoted_path = quote_local_unicode_path(localpath)
try:
children = listdir_unicode(localpath)
def Resume(self):
self.is_upload_ready = True
+ # XXX
+ self._turn_deque()
def upload_ready(self):
"""upload_ready is used to signal us to start
def _notify(self, opaque, path, events_mask):
self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
self._stats_provider.count('drop_upload.objects_queued', 1)
- if path not in self._pending:
- self._append_to_deque(path)
+ if path.path not in self._pending:
+ self._append_to_deque(path.path)
def _process(self, path):
d = defer.succeed(None)
# directory entry, but we should probably modify the file (as the SFTP frontend does).
def _add_file(ignore):
self._pending.remove(path)
- name = path.basename()
+ name = os.path.basename(path)
# on Windows the name is already Unicode
if sys.platform != "win32":
name = name.decode(get_filesystem_encoding())
- u = FileName(path.path, self._convergence)
+ u = FileName(path, self._convergence)
return self._parent.add_file(name, u)
def _add_dir(ignore):
- print "_add_dir %s" % (path.path,)
+ print "_add_dir %s" % (path,)
self._pending.remove(path)
- name = path.basename()
- dirname = path.path
+ name = os.path.basename(path)
+ dirname = path
# on Windows the name is already Unicode
if sys.platform != "win32":
name = name.decode(get_filesystem_encoding())
- dirname = path.path.decode(get_filesystem_encoding())
+ dirname = path.decode(get_filesystem_encoding())
- self._scan(dirname)
+ reactor.callLater(0, self._scan, dirname)
return self._parent.create_subdirectory(name)
def _maybe_upload(val):
- if not os.path.exists(path.path):
+ if not os.path.exists(path):
self._log("uploader: not uploading non-existent file.")
self._stats_provider.count('drop_upload.objects_disappeared', 1)
return NoSuchChildError("not uploading non-existent file")
- elif os.path.islink(path.path):
+ elif os.path.islink(path):
self._log("operator ERROR: symlink not being processed.")
return failure.Failure()
- if os.path.isdir(path.path):
+ if os.path.isdir(path):
d.addCallback(_add_dir)
self._stats_provider.count('drop_upload.directories_created', 1)
return None
- elif os.path.isfile(path.path):
+ elif os.path.isfile(path):
d.addCallback(_add_file)
self._stats_provider.count('drop_upload.files_uploaded', 1)
return None
def _failed(f):
self._stats_provider.count('drop_upload.objects_queued', -1)
- if os.path.exists(path.path):
- self._log("drop-upload: %r failed to upload due to %r" % (path.path, f))
+ if os.path.exists(path):
+ self._log("drop-upload: %r failed to upload due to %r" % (path, f))
self._stats_provider.count('drop_upload.objects_failed', 1)
return f
else:
self._log("drop-upload: notified object %r disappeared "
- "(this is normal for temporary objects): %r" % (path.path, f))
+ "(this is normal for temporary objects): %r" % (path, f))
return None
d.addCallbacks(_succeeded, _failed)
import os, sys
import shutil
+import time
from twisted.trial import unittest
from twisted.python import filepath, runtime
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.directories_created'), 1))
def testMoveSmallTree(res):
- tree_dir = os.path.join(self.basedir, 'creepy_tree')
+ tree_name = 'small_tree'
+ tree_dir = os.path.join(self.basedir, tree_name)
os.mkdir(tree_dir)
- os.path.join(tree_dir, u"tree_frog")
f = open(os.path.join(tree_dir, 'what'), "wb")
f.write("meow")
f.close()
- os.rename(tree_dir, os.path.join(self.local_dir,'creepy_tree'))
- d = defer.Deferred()
- self.uploader.set_uploaded_callback(d.callback)
- return d
+ os.rename(tree_dir, os.path.join(self.local_dir, tree_name))
+ return res
+
d.addCallback(testMoveSmallTree)
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'), 2))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'), 3))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.directories_created'), 2))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'), 1))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_queued'), 0))
d = self.client.create_dirnode()
d.addCallback(self._made_upload_dir)
d.addCallback(lambda ign: self.uploader.Pause())
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'), 0))
def create_file(val):
print "creating file..."
myFile = os.path.join(self.local_dir, "what")
f.close()
return None
d.addCallback(create_file)
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_queued'), 0))
-
def resume_uploader(val):
self.uploader.Resume()
d = defer.Deferred()