from allmydata.interfaces import IDirectoryNode
from allmydata.util import log
from allmydata.util.fileutil import precondition_abspath
+
from allmydata.util.assertutil import precondition
from allmydata.util.encodingutil import listdir_unicode, to_filepath, \
unicode_from_filepath, quote_local_unicode_path, FilenameEncodingError
class MagicFolder(service.MultiService):
name = 'magic-folder'
- def __init__(self, client, upload_dircap, collective_dircap, local_dir, dbfile, inotify=None,
+ def __init__(self, client, upload_dircap, collective_dircap, local_dir_path_u, dbfile, inotify=None,
pending_delay=1.0):
- precondition_abspath(local_dir)
+ precondition_abspath(local_dir_path_u)
service.MultiService.__init__(self)
- local_path = to_filepath(local_dir)
+ local_path = to_filepath(local_dir_path_u)
db = backupdb.get_backupdb(dbfile, create_version=(backupdb.SCHEMA_v3, 3))
if db is None:
if not local_path.exists():
raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
"but there is no directory at that location."
- % quote_local_unicode_path(local_dir))
+ % quote_local_unicode_path(local_dir_path_u))
if not local_path.isdir():
raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
"but the thing at that location is not a directory."
- % quote_local_unicode_path(local_dir))
+ % quote_local_unicode_path(local_dir_path_u))
- self.uploader = Uploader(client, local_path, db, upload_dircap, inotify, pending_delay)
+ self.uploader = Uploader(client, local_dir_path_u, db, upload_dircap, inotify, pending_delay)
self.downloader = Downloader(client, local_path, db, collective_dircap)
def startService(self):
class QueueMixin(object):
- def __init__(self, client, counter, local_path, db):
+ def __init__(self, client, local_path, db):
self._client = client
self._counter = client.stats_provider.count
self._local_path = local_path
self._deque = deque()
self._lazy_tail = defer.succeed(None)
self._pending = set()
- self._processed_callback = lambda ign: None
+ self._callback = lambda ign: None
self._ignore_count = 0
def _do_callback(self, res):
class Uploader(QueueMixin):
- def __init__(self, client, local_path, db, upload_dircap, inotify, pending_delay):
- QueueMixin.__init__(self, client, local_path, db)
+ def __init__(self, client, local_dir_path_u, db, upload_dircap, inotify, pending_delay):
+ QueueMixin.__init__(self, client, local_dir_path_u, db)
+
+ self.local_path = local_dir_path_u
+ self.is_ready = False
# TODO: allow a path rather than a cap URI.
self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
| self._inotify.IN_ONLYDIR
| IN_EXCL_UNLINK
)
- self._notifier.watch(self._local_path, mask=self.mask, callbacks=[self._notify],
+ self._notifier.watch(to_filepath(self.local_path), mask=self.mask, callbacks=[self._notify],
recursive=True)
def start_monitoring(self):
return d
def start_scanning(self):
- self._scan(self._local_dir)
+ self.is_ready = True
+ self._scan(self._local_path)
self._turn_deque()
def _scan(self, localpath):
d = defer.succeed(None)
def _add_file(encoded_name_u, version):
- uploadable = FileName(path_u, self._convergence)
+ uploadable = FileName(path_u, self._client.convergence)
return self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":version}, overwrite=True)
def _add_dir(encoded_name_u):
self._notifier.watch(to_filepath(path_u), mask=self.mask, callbacks=[self._notify], recursive=True)
- uploadable = Data("", self._convergence)
+ uploadable = Data("", self._client.convergence)
encoded_name_u += u"@_"
upload_d = self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":0}, overwrite=True)
def _succeeded(ign):
return upload_d
def _maybe_upload(val):
- self._upload_pending.remove(path_u) # FIXME make _upload_pending hold relative paths
- relpath_u = os.path.relpath(path_u, self._local_dir)
+ self._pending.remove(path_u) # FIXME make _upload_pending hold relative paths
+ relpath_u = os.path.relpath(path_u, self.local_path)
encoded_name_u = magicpath.path2magic(relpath_u)
def get_metadata(result):
current_version = self._db.get_local_file_version(relpath_u) + 1
metadata['version'] = current_version
metadata['deleted'] = True
- empty_uploadable = Data("", self._convergence)
+ empty_uploadable = Data("", self._client.convergence)
return self._upload_dirnode.add_file(encoded_name_u, empty_uploadable, overwrite=True, metadata=metadata)
d2.addCallback(set_deleted)
d2.addCallback(lambda x: Exception("file does not exist"))
def _scan_remote_collective(self):
if self._collective_dirnode is None:
return
- upload_readonly_dircap = self._upload_dirnode.get_readonly_uri()
collective_dirmap_d = self._collective_dirnode.list()
- def do_filter(result):
- others = [x for x in result.keys() if result[x][0].get_readonly_uri() != upload_readonly_dircap]
+
+ def do_list(result):
+ others = [x for x in result.keys()]
return result, others
- collective_dirmap_d.addCallback(do_filter)
+ collective_dirmap_d.addCallback(do_list)
+
def scan_collective(result):
d = defer.succeed(None)
collective_dirmap, others_list = result
# FIXME move to QueueMixin
def _turn_deque(self):
- if self._stopped:
- return
+ #if self._stopped:
+ # return
try:
file_path, file_node, metadata = self._deque.pop()
except IndexError:
def _check_move_empty_tree(res):
self.mkdir_nonascii(empty_tree_dir)
d2 = defer.Deferred()
- self.magicfolder.set_processed_callback(d2.callback)
+ self.magicfolder.set_callback(d2.callback)
os.rename(empty_tree_dir, new_empty_tree_dir)
self.notify(to_filepath(new_empty_tree_dir), self.inotify.IN_MOVED_TO)
return d2
self.mkdir_nonascii(small_tree_dir)
fileutil.write(abspath_expanduser_unicode(u"what", base=small_tree_dir), "say when")
d2 = defer.Deferred()
- self.magicfolder.set_processed_callback(d2.callback, ignore_count=1)
+ self.magicfolder.set_callback(d2.callback, ignore_count=1)
os.rename(small_tree_dir, new_small_tree_dir)
self.notify(to_filepath(new_small_tree_dir), self.inotify.IN_MOVED_TO)
return d2
def _check_moved_tree_is_watched(res):
d2 = defer.Deferred()
- self.magicfolder.set_processed_callback(d2.callback)
+ self.magicfolder.set_callback(d2.callback)
fileutil.write(abspath_expanduser_unicode(u"another", base=new_small_tree_dir), "file")
self.notify(to_filepath(abspath_expanduser_unicode(u"another", base=new_small_tree_dir)), self.inotify.IN_CLOSE_WRITE)
return d2
def create_test_file(result):
d2 = defer.Deferred()
- self.magicfolder.set_processed_callback(d2.callback)
+ self.magicfolder.set_callback(d2.callback)
test_file = abspath_expanduser_unicode(u"what", base=self.local_dir)
fileutil.write(test_file, "meow")
self.notify(to_filepath(test_file), self.inotify.IN_CLOSE_WRITE)
# Note: this relies on the fact that we only get one IN_CLOSE_WRITE notification per file
# (otherwise we would get a defer.AlreadyCalledError). Should we be relying on that?
d = defer.Deferred()
- self.magicfolder.set_processed_callback(d.callback)
+ self.magicfolder.set_callback(d.callback)
path_u = abspath_expanduser_unicode(name_u, base=self.local_dir)
path = to_filepath(path_u)
def _check_version_in_dmd(self, magicfolder, relpath_u, expected_version):
encoded_name_u = magicpath.path2magic(relpath_u)
- d = magicfolder._upload_dirnode.get_child_and_metadata(encoded_name_u)
+ d = magicfolder.uploader._upload_dirnode.get_child_and_metadata(encoded_name_u)
def _check((filenode, metadata)):
self.failUnless(metadata, "no metadata for %r" % (relpath_u,))
self.failUnlessEqual(metadata['version'], expected_version)
def Alice_write_a_file(result):
print "Alice writes a file\n"
- self.file_path = abspath_expanduser_unicode(u"file1", base=self.alice_magicfolder._local_dir)
+ self.file_path = abspath_expanduser_unicode(u"file1", base=self.alice_magicfolder.uploader.local_path)
fileutil.write(self.file_path, "meow, meow meow. meow? meow meow! meow.")
self.magicfolder = self.alice_magicfolder
self.notify(to_filepath(self.file_path), self.inotify.IN_CLOSE_WRITE)
def Alice_wait_for_upload(result):
print "Alice waits for an upload\n"
d2 = defer.Deferred()
- self.alice_magicfolder.set_processed_callback(d2.callback)
+ self.alice_magicfolder.uploader.set_callback(d2.callback)
return d2
d.addCallback(Alice_wait_for_upload)
d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 0))
def cleanup_Alice_and_Bob(result):
d = defer.succeed(None)
- d.addCallback(lambda ign: self.alice_magicfolder.finish(for_tests=True))
- d.addCallback(lambda ign: self.bob_magicfolder.finish(for_tests=True))
+ d.addCallback(lambda ign: self.alice_magicfolder.finish())
+ d.addCallback(lambda ign: self.bob_magicfolder.finish())
d.addCallback(lambda ign: result)
return d
d.addCallback(cleanup_Alice_and_Bob)
self.inotify = fake_inotify
def notify(self, path, mask):
- self.magicfolder._notifier.event(path, mask)
+ self.magicfolder.uploader._notifier.event(path, mask)
def test_errors(self):
self.set_up_grid()