from twisted.python import runtime
from twisted.application import service
-from allmydata.interfaces import IDirectoryNode, NoSuchChildError, ExistingChildError
+from allmydata.interfaces import IDirectoryNode
+from allmydata.util import log
from allmydata.util.fileutil import abspath_expanduser_unicode, precondition_abspath
from allmydata.util.encodingutil import listdir_unicode, to_filepath, \
unicode_from_filepath, quote_local_unicode_path, FilenameEncodingError
if self._parent.is_unknown() or self._parent.is_readonly():
raise AssertionError("The URI in 'private/drop_upload_dircap' is not a writecap to a directory.")
- self._uploaded_callback = lambda ign: None
+ self._processed_callback = lambda ign: None
self._ignore_count = 0
self._notifier = inotify.INotify()
def _process(self, path):
d = defer.succeed(None)
- # FIXME (ticket #1712): if this already exists as a mutable file, we replace the
- # directory entry, but we should probably modify the file (as the SFTP frontend does).
- def _add_file(ignore, name):
+ def _add_file(name):
u = FileName(path, self._convergence)
return self._parent.add_file(name, u)
- def _add_dir(ignore, name):
+ def _add_dir(name):
self._notifier.watch(to_filepath(path), mask=self.mask, callbacks=[self._notify], recursive=True)
u = Data("", self._convergence)
name += "@_"
d2 = self._parent.add_file(name, u)
- def _err(f):
- f.trap(ExistingChildError)
- self._log("subdirectory %r already exists" % (path,))
- d2.addCallbacks(lambda ign: self._log("created subdirectory %r" % (path,)), _err)
+ def _succeeded(ign):
+ self._log("created subdirectory %r" % (path,))
+ self._stats_provider.count('drop_upload.directories_created', 1)
def _failed(f):
- self._log("failed to create subdirectory %r due to %r" % (path, f))
- self._stats_provider.count('drop_upload.objects_failed', 1)
- d2.addErrback(_failed)
+ self._log("failed to create subdirectory %r" % (path,))
+ return f
+ d2.addCallbacks(_succeeded, _failed)
d2.addCallback(lambda ign: self._scan(path))
return d2
self._pending.remove(path)
relpath = os.path.relpath(path, self._local_dir)
name = magicpath.path2magic(relpath)
- # XXX
- #name = os.path.basename(path)
if not os.path.exists(path):
- self._log("uploader: not uploading non-existent file.")
+ self._log("drop-upload: notified object %r disappeared "
+ "(this is normal for temporary objects)" % (path,))
self._stats_provider.count('drop_upload.objects_disappeared', 1)
- return NoSuchChildError("not uploading non-existent file")
+ return None
elif os.path.islink(path):
- self._log("operator ERROR: symlink not being processed.")
- return Failure()
+ raise Exception("symlink not being processed")
if os.path.isdir(path):
- d.addCallback(_add_dir, name)
- self._stats_provider.count('drop_upload.directories_created', 1)
- return None
+ return _add_dir(name)
elif os.path.isfile(path):
- d.addCallback(_add_file, name)
+ d2 = _add_file(name)
def add_db_entry(filenode):
filecap = filenode.get_uri()
s = os.stat(path)
ctime = s[stat.ST_CTIME]
mtime = s[stat.ST_MTIME]
self._db.did_upload_file(filecap, path, mtime, ctime, size)
- d.addCallback(add_db_entry)
- self._stats_provider.count('drop_upload.files_uploaded', 1)
- return None
+ self._stats_provider.count('drop_upload.files_uploaded', 1)
+ d2.addCallback(add_db_entry)
+ return d2
else:
- self._log("operator ERROR: non-directory/non-regular file not being processed.")
- return Failure()
+ raise Exception("non-directory/non-regular file not being processed")
d.addCallback(_maybe_upload)
- def _succeeded(ign):
+ def _succeeded(res):
self._stats_provider.count('drop_upload.objects_queued', -1)
- self._stats_provider.count('drop_upload.objects_uploaded', 1)
-
+ self._stats_provider.count('drop_upload.objects_succeeded', 1)
+ return res
def _failed(f):
self._stats_provider.count('drop_upload.objects_queued', -1)
- if os.path.exists(path):
- self._log("drop-upload: %r failed to upload due to %r" % (path, f))
- self._stats_provider.count('drop_upload.objects_failed', 1)
- return f
- else:
- self._log("drop-upload: notified object %r disappeared "
- "(this is normal for temporary objects): %r" % (path, f))
- return None
-
+ self._stats_provider.count('drop_upload.objects_failed', 1)
+ self._log("%r while processing %r" % (f, path))
+ return f
d.addCallbacks(_succeeded, _failed)
- d.addBoth(self._do_upload_callback)
+ d.addBoth(self._do_processed_callback)
return d
- def _do_upload_callback(self, res):
+ def _do_processed_callback(self, res):
if self._ignore_count == 0:
- self._uploaded_callback(res)
+ self._processed_callback(res)
else:
self._ignore_count -= 1
+ return None # intentionally suppress failures, which have already been logged
- def set_uploaded_callback(self, callback, ignore_count=0):
+ def set_processed_callback(self, callback, ignore_count=0):
"""
- This sets a function that will be called after a file has been uploaded.
+ This sets a function that will be called after a notification has been processed
+ (successfully or unsuccessfully).
"""
- self._uploaded_callback = callback
+ self._processed_callback = callback
self._ignore_count = ignore_count
def finish(self, for_tests=False):
return service.MultiService.disownServiceParent(self)
def _log(self, msg):
- self._client.log(msg)
+ self._client.log("drop-upload: " + msg)
print "_log:", msg
#open("events", "ab+").write(msg)
import os, sys, stat, time
from twisted.trial import unittest
-from twisted.python import runtime
from twisted.internet import defer
from allmydata.interfaces import IDirectoryNode, NoSuchChildError
def _check_move_empty_tree(res):
self.mkdir_nonascii(empty_tree_dir)
d2 = defer.Deferred()
- self.uploader.set_uploaded_callback(d2.callback, ignore_count=0)
+ self.uploader.set_processed_callback(d2.callback, ignore_count=0)
os.rename(empty_tree_dir, new_empty_tree_dir)
self.notify(to_filepath(new_empty_tree_dir), self.inotify.IN_MOVED_TO)
return d2
d.addCallback(_check_move_empty_tree)
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'), 1))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_succeeded'), 1))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'), 0))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_queued'), 0))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.directories_created'), 1))
self.mkdir_nonascii(small_tree_dir)
fileutil.write(abspath_expanduser_unicode(u"what", base=small_tree_dir), "say when")
d2 = defer.Deferred()
- self.uploader.set_uploaded_callback(d2.callback, ignore_count=1)
+ self.uploader.set_processed_callback(d2.callback, ignore_count=1)
os.rename(small_tree_dir, new_small_tree_dir)
self.notify(to_filepath(new_small_tree_dir), self.inotify.IN_MOVED_TO)
return d2
d.addCallback(_check_move_small_tree)
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'), 3))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_succeeded'), 3))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'), 1))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_queued'), 0))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.directories_created'), 2))
def _check_moved_tree_is_watched(res):
d2 = defer.Deferred()
- self.uploader.set_uploaded_callback(d2.callback, ignore_count=0)
+ self.uploader.set_processed_callback(d2.callback, ignore_count=0)
fileutil.write(abspath_expanduser_unicode(u"another", base=new_small_tree_dir), "file")
self.notify(to_filepath(abspath_expanduser_unicode(u"another", base=new_small_tree_dir)), self.inotify.IN_CLOSE_WRITE)
return d2
d.addCallback(_check_moved_tree_is_watched)
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'), 4))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_succeeded'), 4))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'), 2))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_queued'), 0))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.directories_created'), 2))
# Files that are moved out of the upload directory should no longer be watched.
def _move_dir_away(ign):
os.rename(new_empty_tree_dir, empty_tree_dir)
- self.notify(to_filepath(new_empty_tree_dir), self.inotify.IN_MOVED_FROM)
+ # Wuh? Why don't we get this event for the real test?
+ #self.notify(to_filepath(new_empty_tree_dir), self.inotify.IN_MOVED_FROM)
d.addCallback(_move_dir_away)
def create_file(val):
test_file = abspath_expanduser_unicode(u"what", base=empty_tree_dir)
return
d.addCallback(create_file)
d.addCallback(lambda ign: time.sleep(1))
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'), 4))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_succeeded'), 4))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'), 2))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_queued'), 0))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.directories_created'), 2))
def create_file(val):
d2 = defer.Deferred()
- self.uploader.set_uploaded_callback(d2.callback)
+ self.uploader.set_processed_callback(d2.callback)
test_file = abspath_expanduser_unicode(u"what", base=self.local_dir)
fileutil.write(test_file, "meow")
self.notify(to_filepath(test_file), self.inotify.IN_CLOSE_WRITE)
return d2
d.addCallback(create_file)
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'), 1))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_succeeded'), 1))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_queued'), 0))
d.addCallback(self._cleanup)
d.addCallback(_restart)
d.addCallback(self._create_uploader)
d.addCallback(lambda ign: time.sleep(3))
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_succeeded'), 0))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_queued'), 0))
d.addBoth(self._cleanup)
return d
# Write to the same file again with different data.
d.addCallback(lambda ign: self._check_file(u"short", "different"))
-
+
# Test that temporary files are not uploaded.
d.addCallback(lambda ign: self._check_file(u"tempfile", "test", temporary=True))
return d
def _check_file(self, name_u, data, temporary=False):
- previously_uploaded = self._get_count('drop_upload.objects_uploaded')
+ previously_uploaded = self._get_count('drop_upload.objects_succeeded')
previously_disappeared = self._get_count('drop_upload.objects_disappeared')
d = defer.Deferred()
# Note: this relies on the fact that we only get one IN_CLOSE_WRITE notification per file
# (otherwise we would get a defer.AlreadyCalledError). Should we be relying on that?
- self.uploader.set_uploaded_callback(d.callback)
+ self.uploader.set_processed_callback(d.callback)
path_u = abspath_expanduser_unicode(name_u, base=self.local_dir)
path = to_filepath(path_u)
d.addCallback(lambda ign: self.upload_dirnode.get(name_u))
d.addCallback(download_to_data)
d.addCallback(lambda actual_data: self.failUnlessReallyEqual(actual_data, data))
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'),
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_succeeded'),
previously_uploaded + 1))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_queued'), 0))