From: Daira Hopwood Date: Mon, 28 Dec 2015 16:00:35 +0000 (+0000) Subject: WIP. X-Git-Url: https://git.rkrishnan.org/pf/$top_link?a=commitdiff_plain;h=6f3914fa89633b4c80f6fead4b084939f5c6053a;p=tahoe-lafs%2Ftahoe-lafs.git WIP. Signed-off-by: Daira Hopwood --- diff --git a/src/allmydata/backupdb.py b/src/allmydata/backupdb.py index a51356ca..8fecb854 100644 --- a/src/allmydata/backupdb.py +++ b/src/allmydata/backupdb.py @@ -21,7 +21,8 @@ CREATE TABLE version CREATE TABLE local_files ( path VARCHAR(1024) PRIMARY KEY, -- index, this is an absolute UTF-8-encoded local filename - size INTEGER, -- os.stat(fn)[stat.ST_SIZE] + -- note that size is before mtime and ctime here, but after in function parameters + size INTEGER, -- os.stat(fn)[stat.ST_SIZE] (NULL if the file has been deleted) mtime NUMBER, -- os.stat(fn)[stat.ST_MTIME] ctime NUMBER, -- os.stat(fn)[stat.ST_CTIME] fileid INTEGER%s @@ -186,9 +187,9 @@ class BackupDB: is not healthy, please upload the file and call r.did_upload(filecap) when you're done. - If use_timestamps=True (the default), I will compare ctime and mtime + If use_timestamps=True (the default), I will compare mtime and ctime of the local file against an entry in my database, and consider the - file to be unchanged if ctime, mtime, and filesize are all the same + file to be unchanged if mtime, ctime, and filesize are all the same as the earlier version. If use_timestamps=False, I will not trust the timestamps, so more files (perhaps all) will be marked as needing upload. A future version of this database may hash the file to make @@ -200,10 +201,12 @@ class BackupDB: """ path = abspath_expanduser_unicode(path) + + # XXX consider using get_pathinfo s = os.stat(path) size = s[stat.ST_SIZE] - ctime = s[stat.ST_CTIME] mtime = s[stat.ST_MTIME] + ctime = s[stat.ST_CTIME] now = time.time() c = self.cursor @@ -368,34 +371,33 @@ class BackupDB: class MagicFolderDB(BackupDB): VERSION = 3 - def get_all_files(self): - """Retreive a list of all files that have had an entry in magic-folder db - (files that have been downloaded at least once). + def get_all_relpaths(self): + """ + Retrieve a list of all relpaths of files that have had an entry in magic folder db + (i.e. that have been downloaded at least once). """ self.cursor.execute("SELECT path FROM local_files") rows = self.cursor.fetchall() - if not rows: - return None - else: - return rows + return set([r[0] for r in rows]) - def get_local_file_version(self, path): - """I will tell you the version of a local file tracked by our magic folder db. - If no db entry found then I'll return None. + def get_local_file_version(self, relpath_u): + """ + Return the version of a local file tracked by our magic folder db. + If no db entry is found then return None. """ c = self.cursor c.execute("SELECT version, fileid" " FROM local_files" " WHERE path=?", - (path,)) + (relpath_u,)) row = self.cursor.fetchone() if not row: return None else: return row[0] - def did_upload_file(self, filecap, path, version, mtime, ctime, size): - #print "_did_upload_file(%r, %r, %r, %r, %r, %r)" % (filecap, path, version, mtime, ctime, size) + def did_upload_version(self, filecap, relpath_u, version, pathinfo): + #print "did_upload_version(%r, %r, %r, %r)" % (filecap, relpath_u, version, pathinfo) now = time.time() fileid = self.get_or_allocate_fileid_for_cap(filecap) try: @@ -408,35 +410,26 @@ class MagicFolderDB(BackupDB): (now, now, fileid)) try: self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?,?)", - (path, size, mtime, ctime, fileid, version)) + (relpath_u, pathinfo.size, pathinfo.mtime, pathinfo.ctime, fileid, version)) except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError): self.cursor.execute("UPDATE local_files" " SET size=?, mtime=?, ctime=?, fileid=?, version=?" " WHERE path=?", - (size, mtime, ctime, fileid, version, path)) + (pathinfo.size, pathinfo.mtime, pathinfo.ctime, fileid, version, relpath_u)) self.connection.commit() - def is_new_file_time(self, path, relpath_u): - """recent_file_time returns true if the file is recent... - meaning its current statinfo (i.e. size, ctime, and mtime) matched the statinfo - that was previously stored in the db. + def is_new_file(self, pathinfo, relpath_u): """ - #print "check_file_time %s %s" % (path, relpath_u) - path = abspath_expanduser_unicode(path) - s = os.stat(path) - size = s[stat.ST_SIZE] - ctime = s[stat.ST_CTIME] - mtime = s[stat.ST_MTIME] + Returns true if the file's current pathinfo (size, mtime, and ctime) has + changed from the pathinfo previously stored in the db. + """ + #print "is_new_file(%r, %r)" % (pathinfo, relpath_u) c = self.cursor - c.execute("SELECT size,mtime,ctime,fileid" + c.execute("SELECT size, mtime, ctime" " FROM local_files" " WHERE path=?", (relpath_u,)) row = self.cursor.fetchone() if not row: return True - (last_size,last_mtime,last_ctime,last_fileid) = row - if (size, ctime, mtime) == (last_size, last_ctime, last_mtime): - return False - else: - return True + return (pathinfo.size, pathinfo.mtime, pathinfo.ctime) != row diff --git a/src/allmydata/client.py b/src/allmydata/client.py index e1f3c595..f86c6fc2 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -129,6 +129,7 @@ class Client(node.Node, pollmixin.PollMixin): } def __init__(self, basedir="."): + #print "Client.__init__(%r)" % (basedir,) node.Node.__init__(self, basedir) self.connected_enough_d = defer.Deferred() self.started_timestamp = time.time() @@ -513,7 +514,7 @@ class Client(node.Node, pollmixin.PollMixin): from allmydata.frontends import magic_folder - s = magic_folder.MagicFolder(self, upload_dircap, local_dir, dbfile) + s = magic_folder.MagicFolder(self, upload_dircap, collective_dircap, local_dir, dbfile) s.setServiceParent(self) s.startService() diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py index ee820b87..422a7c11 100644 --- a/src/allmydata/frontends/magic_folder.py +++ b/src/allmydata/frontends/magic_folder.py @@ -1,149 +1,596 @@ -import sys +import sys, os +import os.path +from collections import deque +import time -from twisted.internet import defer -from twisted.python.filepath import FilePath +from twisted.internet import defer, reactor, task +from twisted.python.failure import Failure +from twisted.python import runtime from twisted.application import service -from foolscap.api import eventually +from allmydata.util import fileutil from allmydata.interfaces import IDirectoryNode +from allmydata.util import log +from allmydata.util.fileutil import precondition_abspath, get_pathinfo +from allmydata.util.assertutil import precondition +from allmydata.util.deferredutil import HookMixin +from allmydata.util.encodingutil import listdir_filepath, to_filepath, \ + extend_filepath, unicode_from_filepath, unicode_segments_from, \ + quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError +from allmydata.immutable.upload import FileName, Data +from allmydata import backupdb, magicpath -from allmydata.util.fileutil import abspath_expanduser_unicode, precondition_abspath -from allmydata.util.encodingutil import listdir_unicode, to_filepath, \ - unicode_from_filepath, quote_local_unicode_path, FilenameEncodingError -from allmydata.immutable.upload import FileName -from allmydata import backupdb +IN_EXCL_UNLINK = 0x04000000L + +def get_inotify_module(): + try: + if sys.platform == "win32": + from allmydata.windows import inotify + elif runtime.platform.supportsINotify(): + from twisted.internet import inotify + else: + raise NotImplementedError("filesystem notification needed for drop-upload is not supported.\n" + "This currently requires Linux or Windows.") + return inotify + except (ImportError, AttributeError) as e: + log.msg(e) + if sys.platform == "win32": + raise NotImplementedError("filesystem notification needed for drop-upload is not supported.\n" + "Windows support requires at least Vista, and has only been tested on Windows 7.") + raise class MagicFolder(service.MultiService): name = 'magic-folder' - def __init__(self, client, upload_dircap, local_dir, dbfile, inotify=None, - pending_delay=1.0): - precondition_abspath(local_dir) + def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, + pending_delay=1.0, clock=reactor): + precondition_abspath(local_path_u) service.MultiService.__init__(self) - self._local_dir = abspath_expanduser_unicode(local_dir) + + db = backupdb.get_backupdb(dbfile, create_version=(backupdb.SCHEMA_v3, 3)) + if db is None: + return Failure(Exception('ERROR: Unable to load magic folder db.')) + + # for tests self._client = client - self._stats_provider = client.stats_provider - self._convergence = client.convergence - self._local_path = to_filepath(self._local_dir) - self._dbfile = dbfile + self._db = db - self.is_upload_ready = False + self.is_ready = False - if inotify is None: - if sys.platform == "win32": - from allmydata.windows import inotify - else: - from twisted.internet import inotify - self._inotify = inotify + self.uploader = Uploader(client, local_path_u, db, upload_dircap, pending_delay, clock) + self.downloader = Downloader(client, local_path_u, db, collective_dircap, clock) + + def startService(self): + # TODO: why is this being called more than once? + if self.running: + return defer.succeed(None) + print "%r.startService" % (self,) + service.MultiService.startService(self) + return self.uploader.start_monitoring() + + def ready(self): + """ready is used to signal us to start + processing the upload and download items... + """ + self.is_ready = True + d = self.uploader.start_scanning() + d2 = self.downloader.start_scanning() + d.addCallback(lambda ign: d2) + return d - if not self._local_path.exists(): + def finish(self): + print "finish" + d = self.uploader.stop() + d2 = self.downloader.stop() + d.addCallback(lambda ign: d2) + return d + + def remove_service(self): + return service.MultiService.disownServiceParent(self) + + +class QueueMixin(HookMixin): + def __init__(self, client, local_path_u, db, name, clock): + self._client = client + self._local_path_u = local_path_u + self._local_filepath = to_filepath(local_path_u) + self._db = db + self._name = name + self._clock = clock + self._hooks = {'processed': None, 'started': None} + self.started_d = self.set_hook('started') + + if not self._local_filepath.exists(): raise AssertionError("The '[magic_folder] local.directory' parameter was %s " "but there is no directory at that location." - % quote_local_unicode_path(local_dir)) - if not self._local_path.isdir(): + % quote_local_unicode_path(self._local_path_u)) + if not self._local_filepath.isdir(): raise AssertionError("The '[magic_folder] local.directory' parameter was %s " "but the thing at that location is not a directory." - % quote_local_unicode_path(local_dir)) + % quote_local_unicode_path(self._local_path_u)) + + self._deque = deque() + self._lazy_tail = defer.succeed(None) + self._pending = set() + self._stopped = False + self._turn_delay = 0 + + def _get_filepath(self, relpath_u): + return extend_filepath(self._local_filepath, relpath_u.split(u"/")) + + def _get_relpath(self, filepath): + print "_get_relpath(%r)" % (filepath,) + segments = unicode_segments_from(filepath, self._local_filepath) + print "segments = %r" % (segments,) + return u"/".join(segments) + + def _count(self, counter_name, delta=1): + ctr = 'magic_folder.%s.%s' % (self._name, counter_name) + print "%r += %r" % (ctr, delta) + self._client.stats_provider.count(ctr, delta) + + def _log(self, msg): + s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg) + self._client.log(s) + print s + #open("events", "ab+").write(msg) + + def _append_to_deque(self, relpath_u): + print "_append_to_deque(%r)" % (relpath_u,) + if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u): + return + self._deque.append(relpath_u) + self._pending.add(relpath_u) + self._count('objects_queued') + if self.is_ready: + self._clock.callLater(0, self._turn_deque) + + def _turn_deque(self): + if self._stopped: + return + try: + item = self._deque.pop() + self._count('objects_queued', -1) + except IndexError: + self._log("deque is now empty") + self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty()) + else: + self._lazy_tail.addCallback(lambda ign: self._process(item)) + self._lazy_tail.addBoth(self._call_hook, 'processed') + self._lazy_tail.addErrback(log.err) + self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque)) + + +class Uploader(QueueMixin): + def __init__(self, client, local_path_u, db, upload_dircap, pending_delay, clock): + QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock) + + self.is_ready = False # TODO: allow a path rather than a cap URI. - self._parent = self._client.create_node_from_uri(upload_dircap) - if not IDirectoryNode.providedBy(self._parent): + self._upload_dirnode = self._client.create_node_from_uri(upload_dircap) + if not IDirectoryNode.providedBy(self._upload_dirnode): raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.") - if self._parent.is_unknown() or self._parent.is_readonly(): + if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly(): raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.") - self._uploaded_callback = lambda ign: None + self._inotify = get_inotify_module() + self._notifier = self._inotify.INotify() - self._notifier = inotify.INotify() if hasattr(self._notifier, 'set_pending_delay'): self._notifier.set_pending_delay(pending_delay) # We don't watch for IN_CREATE, because that would cause us to read and upload a # possibly-incomplete file before the application has closed it. There should always # be an IN_CLOSE_WRITE after an IN_CREATE (I think). - # TODO: what about IN_MOVE_SELF or IN_UNMOUNT? - mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO | inotify.IN_ONLYDIR - self._notifier.watch(self._local_path, mask=mask, callbacks=[self._notify]) - - def _check_db_file(self, childpath): - # returns True if the file must be uploaded. - assert self._db != None - r = self._db.check_file(childpath) - filecap = r.was_uploaded() - if filecap is False: - return True + # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT? + # + self.mask = ( self._inotify.IN_CLOSE_WRITE + | self._inotify.IN_MOVED_TO + | self._inotify.IN_MOVED_FROM + | self._inotify.IN_DELETE + | self._inotify.IN_ONLYDIR + | IN_EXCL_UNLINK + ) + self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify], + recursive=True) - def startService(self): - self._db = backupdb.get_backupdb(self._dbfile) - if self._db is None: - return Failure(Exception('ERROR: Unable to load magic folder db.')) + def start_monitoring(self): + self._log("start_monitoring") + d = defer.succeed(None) + d.addCallback(lambda ign: self._notifier.startReading()) + d.addCallback(lambda ign: self._count('dirs_monitored')) + d.addBoth(self._call_hook, 'started') + return d - service.MultiService.startService(self) - d = self._notifier.startReading() - self._stats_provider.count('drop_upload.dirs_monitored', 1) + def stop(self): + self._log("stop") + self._notifier.stopReading() + self._count('dirs_monitored', -1) + if hasattr(self._notifier, 'wait_until_stopped'): + d = self._notifier.wait_until_stopped() + else: + d = defer.succeed(None) + d.addCallback(lambda ign: self._lazy_tail) return d - def upload_ready(self): - """upload_ready is used to signal us to start - processing the upload items... - """ - self.is_upload_ready = True + def start_scanning(self): + self._log("start_scanning") + self.is_ready = True + self._pending = self._db.get_all_relpaths() + print "all_files %r" % (self._pending) + d = self._scan(u"") + def _add_pending(ign): + # This adds all of the files that were in the db but not already processed + # (normally because they have been deleted on disk). + print "adding %r" % (self._pending) + self._deque.extend(self._pending) + d.addCallback(_add_pending) + d.addCallback(lambda ign: self._turn_deque()) + return d + + def _scan(self, reldir_u): + self._log("scan %r" % (reldir_u,)) + fp = self._get_filepath(reldir_u) + try: + children = listdir_filepath(fp) + except EnvironmentError: + raise Exception("WARNING: magic folder: permission denied on directory %s" + % quote_filepath(fp)) + except FilenameEncodingError: + raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error" + % quote_filepath(fp)) + + d = defer.succeed(None) + for child in children: + assert isinstance(child, unicode), child + d.addCallback(lambda ign, child=child: + ("%s/%s" % (reldir_u, child) if reldir_u else child)) + def _add_pending(relpath_u): + if magicpath.should_ignore_file(relpath_u): + return None + + self._pending.add(relpath_u) + return relpath_u + d.addCallback(_add_pending) + # This call to _process doesn't go through the deque, and probably should. + d.addCallback(self._process) + d.addBoth(self._call_hook, 'processed') + d.addErrback(log.err) + + return d def _notify(self, opaque, path, events_mask): self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask)))) + relpath_u = self._get_relpath(path) + self._append_to_deque(relpath_u) + + def _when_queue_is_empty(self): + return defer.succeed(None) - self._stats_provider.count('drop_upload.files_queued', 1) - eventually(self._process, opaque, path, events_mask) + def _process(self, relpath_u): + self._log("_process(%r)" % (relpath_u,)) + if relpath_u is None: + return + precondition(isinstance(relpath_u, unicode), relpath_u) - def _process(self, opaque, path, events_mask): d = defer.succeed(None) - # FIXME: if this already exists as a mutable file, we replace the directory entry, - # but we should probably modify the file (as the SFTP frontend does). - def _add_file(ign): - name = path.basename() - # on Windows the name is already Unicode - if not isinstance(name, unicode): - name = name.decode(get_filesystem_encoding()) - - u = FileName(path.path, self._convergence) - return self._parent.add_file(name, u) - d.addCallback(_add_file) - - def _succeeded(ign): - self._stats_provider.count('drop_upload.files_queued', -1) - self._stats_provider.count('drop_upload.files_uploaded', 1) - def _failed(f): - self._stats_provider.count('drop_upload.files_queued', -1) - if path.exists(): - self._log("drop-upload: %r failed to upload due to %r" % (path.path, f)) - self._stats_provider.count('drop_upload.files_failed', 1) - return f + def _maybe_upload(val): + fp = self._get_filepath(relpath_u) + pathinfo = get_pathinfo(unicode_from_filepath(fp)) + + print "pending = %r, about to remove %r" % (self._pending, relpath_u) + self._pending.remove(relpath_u) + encoded_path_u = magicpath.path2magic(relpath_u) + + if not pathinfo.exists: + self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp)) + self._count('objects_disappeared') + d2 = defer.succeed(None) + if self._db.check_file_db_exists(relpath_u): + d2.addCallback(lambda ign: self._get_metadata(encoded_path_u)) + current_version = self._db.get_local_file_version(relpath_u) + 1 + def set_deleted(metadata): + metadata['version'] = current_version + metadata['deleted'] = True + empty_uploadable = Data("", self._client.convergence) + return self._upload_dirnode.add_file(encoded_path_u, empty_uploadable, overwrite=True, metadata=metadata) + d2.addCallback(set_deleted) + def add_db_entry(filenode): + filecap = filenode.get_uri() + self._db.did_upload_version(filecap, relpath_u, current_version, pathinfo) + self._count('files_uploaded') + # FIXME consider whether it's correct to retrieve the filenode again. + d2.addCallback(lambda x: self._get_filenode(encoded_path_u)) + d2.addCallback(add_db_entry) + + d2.addCallback(lambda x: Exception("file does not exist")) # FIXME wrong + return d2 + elif pathinfo.islink: + self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp)) + return None + elif pathinfo.isdir: + self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True) + uploadable = Data("", self._client.convergence) + encoded_path_u += magicpath.path2magic(u"/") + upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True) + def _succeeded(ign): + self._log("created subdirectory %r" % (relpath_u,)) + self._count('directories_created') + def _failed(f): + self._log("failed to create subdirectory %r" % (relpath_u,)) + return f + upload_d.addCallbacks(_succeeded, _failed) + upload_d.addCallback(lambda ign: self._scan(relpath_u)) + return upload_d + elif pathinfo.isfile: + version = self._db.get_local_file_version(relpath_u) + if version is None: + version = 0 + elif self._db.is_new_file(pathinfo, relpath_u): + version += 1 + else: + return None + + uploadable = FileName(unicode_from_filepath(fp), self._client.convergence) + d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":version}, overwrite=True) + def add_db_entry(filenode): + filecap = filenode.get_uri() + self._db.did_upload_version(filecap, relpath_u, version, pathinfo) + self._count('files_uploaded') + d2.addCallback(add_db_entry) + return d2 else: - self._log("drop-upload: notified file %r disappeared " - "(this is normal for temporary files): %r" % (path.path, f)) - self._stats_provider.count('drop_upload.files_disappeared', 1) + self.warn("WARNING: cannot process special file %s" % quote_filepath(fp)) return None + + d.addCallback(_maybe_upload) + + def _succeeded(res): + self._count('objects_succeeded') + return res + def _failed(f): + print f + self._count('objects_failed') + self._log("%r while processing %r" % (f, relpath_u)) + return f d.addCallbacks(_succeeded, _failed) - d.addBoth(self._uploaded_callback) return d - def set_uploaded_callback(self, callback): - """This sets a function that will be called after a file has been uploaded.""" - self._uploaded_callback = callback + def _get_metadata(self, encoded_path_u): + try: + d = self._upload_dirnode.get_metadata_for(encoded_path_u) + except KeyError: + return Failure() + return d - def finish(self, for_tests=False): - self._notifier.stopReading() - self._stats_provider.count('drop_upload.dirs_monitored', -1) - if for_tests and hasattr(self._notifier, 'wait_until_stopped'): - return self._notifier.wait_until_stopped() + def _get_filenode(self, encoded_path_u): + try: + d = self._upload_dirnode.get(encoded_path_u) + except KeyError: + return Failure() + return d + + +class Downloader(QueueMixin): + def __init__(self, client, local_path_u, db, collective_dircap, clock): + QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock) + + # TODO: allow a path rather than a cap URI. + self._collective_dirnode = self._client.create_node_from_uri(collective_dircap) + + if not IDirectoryNode.providedBy(self._collective_dirnode): + raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.") + if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly(): + raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.") + + self._turn_delay = 3 # delay between remote scans + self._download_scan_batch = {} # path -> [(filenode, metadata)] + + def start_scanning(self): + self._log("\nstart_scanning") + files = self._db.get_all_relpaths() + self._log("all files %s" % files) + + d = self._scan_remote_collective() + self._turn_deque() + return d + + def stop(self): + self._stopped = True + d = defer.succeed(None) + d.addCallback(lambda ign: self._lazy_tail) + return d + + def _should_download(self, relpath_u, remote_version): + """ + _should_download returns a bool indicating whether or not a remote object should be downloaded. + We check the remote metadata version against our magic-folder db version number; + latest version wins. + """ + if magicpath.should_ignore_file(relpath_u): + return False + v = self._db.get_local_file_version(relpath_u) + return (v is None or v < remote_version) + + def _get_local_latest(self, relpath_u): + """ + _get_local_latest takes a unicode path string checks to see if this file object + exists in our magic-folder db; if not then return None + else check for an entry in our magic-folder db and return the version number. + """ + if not self._get_filepath(relpath_u).exists(): + return None + return self._db.get_local_file_version(relpath_u) + + def _get_collective_latest_file(self, filename): + """ + _get_collective_latest_file takes a file path pointing to a file managed by + magic-folder and returns a deferred that fires with the two tuple containing a + file node and metadata for the latest version of the file located in the + magic-folder collective directory. + """ + collective_dirmap_d = self._collective_dirnode.list() + def scan_collective(result): + list_of_deferreds = [] + for dir_name in result.keys(): + # XXX make sure it's a directory + d = defer.succeed(None) + d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename)) + list_of_deferreds.append(d) + deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True) + return deferList + collective_dirmap_d.addCallback(scan_collective) + def highest_version(deferredList): + max_version = 0 + metadata = None + node = None + for success, result in deferredList: + if success: + if result[1]['version'] > max_version: + node, metadata = result + max_version = result[1]['version'] + return node, metadata + collective_dirmap_d.addCallback(highest_version) + return collective_dirmap_d + + def _append_to_batch(self, name, file_node, metadata): + if self._download_scan_batch.has_key(name): + self._download_scan_batch[name] += [(file_node, metadata)] else: - return defer.succeed(None) + self._download_scan_batch[name] = [(file_node, metadata)] - def _log(self, msg): - self._client.log(msg) - #open("events", "ab+").write(msg) + def _scan_remote(self, nickname, dirnode): + self._log("_scan_remote nickname %r" % (nickname,)) + d = dirnode.list() + def scan_listing(listing_map): + for name in listing_map.keys(): + file_node, metadata = listing_map[name] + local_version = self._get_local_latest(name) + remote_version = metadata.get('version', None) + self._log("%r has local version %r, remote version %r" % (name, local_version, remote_version)) + if local_version is None or remote_version is None or local_version < remote_version: + self._log("added to download queue\n") + self._append_to_batch(name, file_node, metadata) + d.addCallback(scan_listing) + return d + + def _scan_remote_collective(self): + self._log("_scan_remote_collective") + self._download_scan_batch = {} # XXX + + if self._collective_dirnode is None: + return + collective_dirmap_d = self._collective_dirnode.list() + def do_list(result): + others = [x for x in result.keys()] + return result, others + collective_dirmap_d.addCallback(do_list) + def scan_collective(result): + d = defer.succeed(None) + collective_dirmap, others_list = result + for dir_name in others_list: + d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0])) + # XXX todo add errback + return d + collective_dirmap_d.addCallback(scan_collective) + collective_dirmap_d.addCallback(self._filter_scan_batch) + collective_dirmap_d.addCallback(self._add_batch_to_download_queue) + return collective_dirmap_d + + def _add_batch_to_download_queue(self, result): + print "result = %r" % (result,) + print "deque = %r" % (self._deque,) + self._deque.extend(result) + print "deque after = %r" % (self._deque,) + self._count('objects_queued', len(result)) + print "pending = %r" % (self._pending,) + self._pending.update(map(lambda x: x[0], result)) + print "pending after = %r" % (self._pending,) + + def _filter_scan_batch(self, result): + extension = [] # consider whether this should be a dict + for relpath_u in self._download_scan_batch.keys(): + if relpath_u in self._pending: + continue + file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version']) + if self._should_download(relpath_u, metadata['version']): + extension += [(relpath_u, file_node, metadata)] + return extension + + def _when_queue_is_empty(self): + d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective) + d.addCallback(lambda ign: self._turn_deque()) + return d + + def _process(self, item): + (relpath_u, file_node, metadata) = item + d = file_node.download_best_version() + def succeeded(res): + fp = self._get_filepath(relpath_u) + abspath_u = unicode_from_filepath(fp) + d2 = defer.succeed(res) + d2.addCallback(lambda result: self._write_downloaded_file(abspath_u, result, is_conflict=False)) + def do_update_db(written_abspath_u): + filecap = file_node.get_uri() + written_pathinfo = get_pathinfo(written_abspath_u) + if not written_pathinfo.exists: + raise Exception("downloaded file %s disappeared" % quote_local_unicode_path(written_abspath_u)) + + self._db.did_upload_version(filecap, relpath_u, metadata['version'], written_pathinfo) + d2.addCallback(do_update_db) + # XXX handle failure here with addErrback... + self._count('objects_downloaded') + return d2 + def failed(f): + self._log("download failed: %s" % (str(f),)) + self._count('objects_download_failed') + return f + d.addCallbacks(succeeded, failed) + def remove_from_pending(res): + self._pending.remove(relpath_u) + return res + d.addBoth(remove_from_pending) + return d + + FUDGE_SECONDS = 10.0 + + @classmethod + def _write_downloaded_file(cls, abspath_u, file_contents, is_conflict=False, now=None): + # 1. Write a temporary file, say .foo.tmp. + # 2. is_conflict determines whether this is an overwrite or a conflict. + # 3. Set the mtime of the replacement file to be T seconds before the + # current local time. + # 4. Perform a file replacement with backup filename foo.backup, + # replaced file foo, and replacement file .foo.tmp. If any step of + # this operation fails, reclassify as a conflict and stop. + # + # Returns the path of the destination file. + + precondition_abspath(abspath_u) + replacement_path_u = abspath_u + u".tmp" # FIXME more unique + backup_path_u = abspath_u + u".backup" + if now is None: + now = time.time() + + fileutil.write(replacement_path_u, file_contents) + os.utime(replacement_path_u, (now, now - cls.FUDGE_SECONDS)) + if is_conflict: + return cls._rename_conflicted_file(abspath_u, replacement_path_u) + else: + try: + fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u) + return abspath_u + except fileutil.ConflictError: + return cls._rename_conflicted_file(abspath_u, replacement_path_u) + + @classmethod + def _rename_conflicted_file(self, abspath_u, replacement_path_u): + conflict_path_u = abspath_u + u".conflict" + fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u) + return conflict_path_u diff --git a/src/allmydata/magicpath.py b/src/allmydata/magicpath.py new file mode 100644 index 00000000..ba15ed58 --- /dev/null +++ b/src/allmydata/magicpath.py @@ -0,0 +1,27 @@ + +import re +import os.path + +from allmydata.util.assertutil import precondition + +def path2magic(path): + return re.sub(ur'[/@]', lambda m: {u'/': u'@_', u'@': u'@@'}[m.group(0)], path) + +def magic2path(path): + return re.sub(ur'@[_@]', lambda m: {u'@_': u'/', u'@@': u'@'}[m.group(0)], path) + + +IGNORE_SUFFIXES = [u'.backup', u'.tmp', u'.conflicted'] +IGNORE_PREFIXES = [u'.'] + +def should_ignore_file(path_u): + precondition(isinstance(path_u, unicode), path_u=path_u) + + for suffix in IGNORE_SUFFIXES: + if path_u.endswith(suffix): + return True + while path_u != u"": + path_u, tail_u = os.path.split(path_u) + if tail_u.startswith(u"."): + return True + return False diff --git a/src/allmydata/scripts/common.py b/src/allmydata/scripts/common.py index d6246fc0..3bfe9750 100644 --- a/src/allmydata/scripts/common.py +++ b/src/allmydata/scripts/common.py @@ -57,9 +57,14 @@ class BasedirOptions(BaseOptions): ] def parseArgs(self, basedir=None): - if self.parent['node-directory'] and self['basedir']: + # This finds the node-directory option correctly even if we are in a subcommand. + root = self.parent + while root.parent is not None: + root = root.parent + + if root['node-directory'] and self['basedir']: raise usage.UsageError("The --node-directory (or -d) and --basedir (or -C) options cannot both be used.") - if self.parent['node-directory'] and basedir: + if root['node-directory'] and basedir: raise usage.UsageError("The --node-directory (or -d) option and a basedir argument cannot both be used.") if self['basedir'] and basedir: raise usage.UsageError("The --basedir (or -C) option and a basedir argument cannot both be used.") @@ -68,13 +73,14 @@ class BasedirOptions(BaseOptions): b = argv_to_abspath(basedir) elif self['basedir']: b = argv_to_abspath(self['basedir']) - elif self.parent['node-directory']: - b = argv_to_abspath(self.parent['node-directory']) + elif root['node-directory']: + b = argv_to_abspath(root['node-directory']) elif self.default_nodedir: b = self.default_nodedir else: raise usage.UsageError("No default basedir available, you must provide one with --node-directory, --basedir, or a basedir argument") self['basedir'] = b + self['node-directory'] = b def postOptions(self): if not self['basedir']: diff --git a/src/allmydata/scripts/magic_folder_cli.py b/src/allmydata/scripts/magic_folder_cli.py new file mode 100644 index 00000000..1cf20c97 --- /dev/null +++ b/src/allmydata/scripts/magic_folder_cli.py @@ -0,0 +1,182 @@ + +import os +from cStringIO import StringIO +from twisted.python import usage + +from .common import BaseOptions, BasedirOptions, get_aliases +from .cli import MakeDirectoryOptions, LnOptions, CreateAliasOptions +import tahoe_mv +from allmydata.util import fileutil +from allmydata import uri + +INVITE_SEPARATOR = "+" + +class CreateOptions(BasedirOptions): + nickname = None + localdir = None + synopsis = "MAGIC_ALIAS: [NICKNAME LOCALDIR]" + def parseArgs(self, alias, nickname=None, localdir=None): + BasedirOptions.parseArgs(self) + if not alias.endswith(':'): + raise usage.UsageError("An alias must end with a ':' character.") + self.alias = alias[:-1] + self.nickname = nickname + self.localdir = localdir + if self.nickname and not self.localdir: + raise usage.UsageError("If NICKNAME is specified then LOCALDIR must also be specified.") + node_url_file = os.path.join(self['node-directory'], "node.url") + self['node-url'] = fileutil.read(node_url_file).strip() + +def _delegate_options(source_options, target_options): + target_options.aliases = get_aliases(source_options['node-directory']) + target_options["node-url"] = source_options["node-url"] + target_options["node-directory"] = source_options["node-directory"] + target_options.stdin = StringIO("") + target_options.stdout = StringIO() + target_options.stderr = StringIO() + return target_options + +def create(options): + from allmydata.scripts import tahoe_add_alias + create_alias_options = _delegate_options(options, CreateAliasOptions()) + create_alias_options.alias = options.alias + + rc = tahoe_add_alias.create_alias(create_alias_options) + if rc != 0: + print >>options.stderr, create_alias_options.stderr.getvalue() + return rc + print >>options.stdout, create_alias_options.stdout.getvalue() + + if options.nickname is not None: + invite_options = _delegate_options(options, InviteOptions()) + invite_options.alias = options.alias + invite_options.nickname = options.nickname + rc = invite(invite_options) + if rc != 0: + print >>options.stderr, "magic-folder: failed to invite after create\n" + print >>options.stderr, invite_options.stderr.getvalue() + return rc + invite_code = invite_options.stdout.getvalue().strip() + + join_options = _delegate_options(options, JoinOptions()) + join_options.invite_code = invite_code + fields = invite_code.split(INVITE_SEPARATOR) + if len(fields) != 2: + raise usage.UsageError("Invalid invite code.") + join_options.magic_readonly_cap, join_options.dmd_write_cap = fields + join_options.local_dir = options.localdir + rc = join(join_options) + if rc != 0: + print >>options.stderr, "magic-folder: failed to join after create\n" + print >>options.stderr, join_options.stderr.getvalue() + return rc + return 0 + +class InviteOptions(BasedirOptions): + nickname = None + synopsis = "MAGIC_ALIAS: NICKNAME" + stdin = StringIO("") + def parseArgs(self, alias, nickname=None): + BasedirOptions.parseArgs(self) + if not alias.endswith(':'): + raise usage.UsageError("An alias must end with a ':' character.") + self.alias = alias[:-1] + self.nickname = nickname + node_url_file = os.path.join(self['node-directory'], "node.url") + self['node-url'] = open(node_url_file, "r").read().strip() + aliases = get_aliases(self['node-directory']) + self.aliases = aliases + +def invite(options): + from allmydata.scripts import tahoe_mkdir + mkdir_options = _delegate_options(options, MakeDirectoryOptions()) + mkdir_options.where = None + + rc = tahoe_mkdir.mkdir(mkdir_options) + if rc != 0: + print >>options.stderr, "magic-folder: failed to mkdir\n" + return rc + dmd_write_cap = mkdir_options.stdout.getvalue().strip() + dmd_readonly_cap = unicode(uri.from_string(dmd_write_cap).get_readonly().to_string(), 'utf-8') + if dmd_readonly_cap is None: + print >>options.stderr, "magic-folder: failed to diminish dmd write cap\n" + return 1 + + magic_write_cap = get_aliases(options["node-directory"])[options.alias] + magic_readonly_cap = unicode(uri.from_string(magic_write_cap).get_readonly().to_string(), 'utf-8') + # tahoe ln CLIENT_READCAP COLLECTIVE_WRITECAP/NICKNAME + ln_options = _delegate_options(options, LnOptions()) + ln_options.from_file = dmd_readonly_cap + ln_options.to_file = "%s/%s" % (magic_write_cap, options.nickname) + rc = tahoe_mv.mv(ln_options, mode="link") + if rc != 0: + print >>options.stderr, "magic-folder: failed to create link\n" + print >>options.stderr, ln_options.stderr.getvalue() + return rc + + print >>options.stdout, "%s%s%s" % (magic_readonly_cap, INVITE_SEPARATOR, dmd_write_cap) + return 0 + +class JoinOptions(BasedirOptions): + synopsis = "INVITE_CODE LOCAL_DIR" + dmd_write_cap = "" + magic_readonly_cap = "" + def parseArgs(self, invite_code, local_dir): + BasedirOptions.parseArgs(self) + self.local_dir = local_dir + fields = invite_code.split(INVITE_SEPARATOR) + if len(fields) != 2: + raise usage.UsageError("Invalid invite code.") + self.magic_readonly_cap, self.dmd_write_cap = fields + +def join(options): + dmd_cap_file = os.path.join(options["node-directory"], "private/magic_folder_dircap") + collective_readcap_file = os.path.join(options["node-directory"], "private/collective_dircap") + + fileutil.write(dmd_cap_file, options.dmd_write_cap) + fileutil.write(collective_readcap_file, options.magic_readonly_cap) + fileutil.write(os.path.join(options["node-directory"], "tahoe.cfg"), + "[magic_folder]\nenabled = True\nlocal.directory = %s\n" + % (options.local_dir.encode('utf-8'),), mode="ab") + return 0 + +class MagicFolderCommand(BaseOptions): + subCommands = [ + ["create", None, CreateOptions, "Create a Magic Folder."], + ["invite", None, InviteOptions, "Invite someone to a Magic Folder."], + ["join", None, JoinOptions, "Join a Magic Folder."], + ] + def postOptions(self): + if not hasattr(self, 'subOptions'): + raise usage.UsageError("must specify a subcommand") + def getSynopsis(self): + return "Usage: tahoe [global-options] magic SUBCOMMAND" + def getUsage(self, width=None): + t = BaseOptions.getUsage(self, width) + t += """\ +Please run e.g. 'tahoe magic-folder create --help' for more details on each +subcommand. +""" + return t + +subDispatch = { + "create": create, + "invite": invite, + "join": join, +} + +def do_magic_folder(options): + so = options.subOptions + so.stdout = options.stdout + so.stderr = options.stderr + f = subDispatch[options.subCommand] + return f(so) + +subCommands = [ + ["magic-folder", None, MagicFolderCommand, + "Magic Folder subcommands: use 'tahoe magic-folder' for a list."], +] + +dispatch = { + "magic-folder": do_magic_folder, +} diff --git a/src/allmydata/scripts/runner.py b/src/allmydata/scripts/runner.py index c331eee7..a029b34a 100644 --- a/src/allmydata/scripts/runner.py +++ b/src/allmydata/scripts/runner.py @@ -5,7 +5,8 @@ from cStringIO import StringIO from twisted.python import usage from allmydata.scripts.common import get_default_nodedir -from allmydata.scripts import debug, create_node, startstop_node, cli, keygen, stats_gatherer, admin +from allmydata.scripts import debug, create_node, startstop_node, cli, keygen, stats_gatherer, admin, \ +magic_folder_cli from allmydata.util.encodingutil import quote_output, quote_local_unicode_path, get_io_encoding def GROUP(s): @@ -45,6 +46,7 @@ class Options(usage.Options): + debug.subCommands + GROUP("Using the filesystem") + cli.subCommands + + magic_folder_cli.subCommands ) optFlags = [ @@ -143,6 +145,8 @@ def runner(argv, rc = admin.dispatch[command](so) elif command in cli.dispatch: rc = cli.dispatch[command](so) + elif command in magic_folder_cli.dispatch: + rc = magic_folder_cli.dispatch[command](so) elif command in ac_dispatch: rc = ac_dispatch[command](so, stdout, stderr) else: diff --git a/src/allmydata/scripts/tahoe_ls.py b/src/allmydata/scripts/tahoe_ls.py index 78eea1f2..9a8ce240 100644 --- a/src/allmydata/scripts/tahoe_ls.py +++ b/src/allmydata/scripts/tahoe_ls.py @@ -151,9 +151,7 @@ def list(options): line.append(uri) if options["readonly-uri"]: line.append(quote_output(ro_uri or "-", quotemarks=False)) - rows.append((encoding_error, line)) - max_widths = [] left_justifys = [] for (encoding_error, row) in rows: diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py index 8dd9a2f9..2dc59381 100644 --- a/src/allmydata/test/no_network.py +++ b/src/allmydata/test/no_network.py @@ -20,6 +20,9 @@ from twisted.internet import defer, reactor from twisted.python.failure import Failure from foolscap.api import Referenceable, fireEventually, RemoteException from base64 import b32encode + +from allmydata.util.assertutil import _assert + from allmydata import uri as tahoe_uri from allmydata.client import Client from allmydata.storage.server import StorageServer, storage_index_to_dir @@ -174,6 +177,9 @@ class NoNetworkStorageBroker: return None class NoNetworkClient(Client): + + def disownServiceParent(self): + self.disownServiceParent() def create_tub(self): pass def init_introducer_client(self): @@ -232,6 +238,7 @@ class NoNetworkGrid(service.MultiService): self.proxies_by_id = {} # maps to IServer on which .rref is a wrapped # StorageServer self.clients = [] + self.client_config_hooks = client_config_hooks for i in range(num_servers): ss = self.make_server(i) @@ -239,30 +246,42 @@ class NoNetworkGrid(service.MultiService): self.rebuild_serverlist() for i in range(num_clients): - clientid = hashutil.tagged_hash("clientid", str(i))[:20] - clientdir = os.path.join(basedir, "clients", - idlib.shortnodeid_b2a(clientid)) - fileutil.make_dirs(clientdir) - f = open(os.path.join(clientdir, "tahoe.cfg"), "w") + c = self.make_client(i) + self.clients.append(c) + + def make_client(self, i, write_config=True): + clientid = hashutil.tagged_hash("clientid", str(i))[:20] + clientdir = os.path.join(self.basedir, "clients", + idlib.shortnodeid_b2a(clientid)) + fileutil.make_dirs(clientdir) + + tahoe_cfg_path = os.path.join(clientdir, "tahoe.cfg") + if write_config: + f = open(tahoe_cfg_path, "w") f.write("[node]\n") f.write("nickname = client-%d\n" % i) f.write("web.port = tcp:0:interface=127.0.0.1\n") f.write("[storage]\n") f.write("enabled = false\n") f.close() - c = None - if i in client_config_hooks: - # this hook can either modify tahoe.cfg, or return an - # entirely new Client instance - c = client_config_hooks[i](clientdir) - if not c: - c = NoNetworkClient(clientdir) - c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE) - c.nodeid = clientid - c.short_nodeid = b32encode(clientid).lower()[:8] - c._servers = self.all_servers # can be updated later - c.setServiceParent(self) - self.clients.append(c) + else: + _assert(os.path.exists(tahoe_cfg_path), tahoe_cfg_path=tahoe_cfg_path) + + c = None + if i in self.client_config_hooks: + # this hook can either modify tahoe.cfg, or return an + # entirely new Client instance + c = self.client_config_hooks[i](clientdir) + + if not c: + c = NoNetworkClient(clientdir) + c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE) + + c.nodeid = clientid + c.short_nodeid = b32encode(clientid).lower()[:8] + c._servers = self.all_servers # can be updated later + c.setServiceParent(self) + return c def make_server(self, i, readonly=False): serverid = hashutil.tagged_hash("serverid", str(i))[:20] @@ -350,6 +369,9 @@ class GridTestMixin: num_servers=num_servers, client_config_hooks=client_config_hooks) self.g.setServiceParent(self.s) + self._record_webports_and_baseurls() + + def _record_webports_and_baseurls(self): self.client_webports = [c.getServiceNamed("webish").getPortnum() for c in self.g.clients] self.client_baseurls = [c.getServiceNamed("webish").getURL() @@ -358,6 +380,23 @@ class GridTestMixin: def get_clientdir(self, i=0): return self.g.clients[i].basedir + def set_clientdir(self, basedir, i=0): + self.g.clients[i].basedir = basedir + + def get_client(self, i=0): + return self.g.clients[i] + + def restart_client(self, i=0): + client = self.g.clients[i] + d = defer.succeed(None) + d.addCallback(lambda ign: self.g.removeService(client)) + def _make_client(ign): + c = self.g.make_client(i, write_config=False) + self.g.clients[i] = c + self._record_webports_and_baseurls() + d.addCallback(_make_client) + return d + def get_serverdir(self, i): return self.g.servers_by_number[i].storedir diff --git a/src/allmydata/test/test_cli.py b/src/allmydata/test/test_cli.py index b59a2b1b..9cfaf659 100644 --- a/src/allmydata/test/test_cli.py +++ b/src/allmydata/test/test_cli.py @@ -49,8 +49,11 @@ def parse_options(basedir, command, args): class CLITestMixin(ReallyEqualMixin): def do_cli(self, verb, *args, **kwargs): + # client_num is used to execute client CLI commands on a specific client. + client_num = kwargs.get("client_num", 0) + nodeargs = [ - "--node-directory", self.get_clientdir(), + "--node-directory", self.get_clientdir(i=client_num), ] argv = nodeargs + [verb] + list(args) stdin = kwargs.get("stdin", "") diff --git a/src/allmydata/test/test_cli_backup.py b/src/allmydata/test/test_cli_backup.py index 3bd2a614..a48295de 100644 --- a/src/allmydata/test/test_cli_backup.py +++ b/src/allmydata/test/test_cli_backup.py @@ -11,7 +11,8 @@ from allmydata.util import fileutil from allmydata.util.fileutil import abspath_expanduser_unicode from allmydata.util.encodingutil import get_io_encoding, unicode_to_argv from allmydata.util.namespace import Namespace -from allmydata.scripts import cli, backupdb +from allmydata.scripts import cli +from allmydata import backupdb from .common_util import StallMixin from .no_network import GridTestMixin from .test_cli import CLITestMixin, parse_options diff --git a/src/allmydata/test/test_cli_magic_folder.py b/src/allmydata/test/test_cli_magic_folder.py new file mode 100644 index 00000000..1ba831ac --- /dev/null +++ b/src/allmydata/test/test_cli_magic_folder.py @@ -0,0 +1,206 @@ +import os.path +import re + +from twisted.trial import unittest +from twisted.internet import defer +from twisted.internet import reactor + +from allmydata.util import fileutil +from allmydata.scripts.common import get_aliases +from allmydata.test.no_network import GridTestMixin +from .test_cli import CLITestMixin +from allmydata.scripts import magic_folder_cli +from allmydata.util.fileutil import abspath_expanduser_unicode +from allmydata.frontends.magic_folder import MagicFolder +from allmydata import uri + + +class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin): + + def do_create_magic_folder(self, client_num): + d = self.do_cli("magic-folder", "create", "magic:", client_num=client_num) + def _done((rc,stdout,stderr)): + self.failUnlessEqual(rc, 0) + self.failUnlessIn("Alias 'magic' created", stdout) + self.failUnlessEqual(stderr, "") + aliases = get_aliases(self.get_clientdir(i=client_num)) + self.failUnlessIn("magic", aliases) + self.failUnless(aliases["magic"].startswith("URI:DIR2:")) + d.addCallback(_done) + return d + + def do_invite(self, client_num, nickname): + d = self.do_cli("magic-folder", "invite", u"magic:", nickname, client_num=client_num) + def _done((rc,stdout,stderr)): + self.failUnless(rc == 0) + return (rc,stdout,stderr) + d.addCallback(_done) + return d + + def do_join(self, client_num, local_dir, invite_code): + magic_readonly_cap, dmd_write_cap = invite_code.split(magic_folder_cli.INVITE_SEPARATOR) + d = self.do_cli("magic-folder", "join", invite_code, local_dir, client_num=client_num) + def _done((rc,stdout,stderr)): + self.failUnless(rc == 0) + return (rc,stdout,stderr) + d.addCallback(_done) + return d + + def check_joined_config(self, client_num, upload_dircap): + """Tests that our collective directory has the readonly cap of + our upload directory. + """ + collective_readonly_cap = fileutil.read(os.path.join(self.get_clientdir(i=client_num), "private/collective_dircap")) + d = self.do_cli("ls", "--json", collective_readonly_cap, client_num=client_num) + def _done((rc,stdout,stderr)): + self.failUnless(rc == 0) + return (rc,stdout,stderr) + d.addCallback(_done) + def test_joined_magic_folder((rc,stdout,stderr)): + readonly_cap = unicode(uri.from_string(upload_dircap).get_readonly().to_string(), 'utf-8') + s = re.search(readonly_cap, stdout) + self.failUnless(s is not None) + return None + d.addCallback(test_joined_magic_folder) + return d + + def get_caps_from_files(self, client_num): + collective_dircap = fileutil.read(os.path.join(self.get_clientdir(i=client_num), "private/collective_dircap")) + upload_dircap = fileutil.read(os.path.join(self.get_clientdir(i=client_num), "private/magic_folder_dircap")) + self.failIf(collective_dircap is None or upload_dircap is None) + return collective_dircap, upload_dircap + + def check_config(self, client_num, local_dir): + client_config = fileutil.read(os.path.join(self.get_clientdir(i=client_num), "tahoe.cfg")) + # XXX utf-8? + local_dir = local_dir.encode('utf-8') + ret = re.search("\[magic_folder\]\nenabled = True\nlocal.directory = %s" % (local_dir,), client_config) + self.failIf(ret is None) + + def create_invite_join_magic_folder(self, nickname, local_dir): + d = self.do_cli("magic-folder", "create", u"magic:", nickname, local_dir) + def _done((rc,stdout,stderr)): + self.failUnless(rc == 0) + return (rc,stdout,stderr) + d.addCallback(_done) + def get_alice_caps(x): + client = self.get_client() + self.collective_dircap, self.upload_dircap = self.get_caps_from_files(0) + self.collective_dirnode = client.create_node_from_uri(self.collective_dircap) + self.upload_dirnode = client.create_node_from_uri(self.upload_dircap) + d.addCallback(get_alice_caps) + d.addCallback(lambda x: self.check_joined_config(0, self.upload_dircap)) + d.addCallback(lambda x: self.check_config(0, local_dir)) + return d + + def cleanup(self, res): + #print "cleanup", res + d = defer.succeed(None) + if self.magicfolder is not None: + d.addCallback(lambda ign: self.magicfolder.finish()) + d.addCallback(lambda ign: res) + return d + + def init_magicfolder(self, client_num, upload_dircap, collective_dircap, local_magic_dir, clock): + dbfile = abspath_expanduser_unicode(u"magicfolderdb.sqlite", base=self.get_clientdir(i=client_num)) + magicfolder = MagicFolder(self.get_client(client_num), upload_dircap, collective_dircap, local_magic_dir, + dbfile, pending_delay=0.2, clock=clock) + magicfolder.setServiceParent(self.get_client(client_num)) + magicfolder.ready() + return magicfolder + + def setup_alice_and_bob(self, clock=reactor): + self.set_up_grid(num_clients=2) + + alice_magic_dir = abspath_expanduser_unicode(u"Alice-magic", base=self.basedir) + self.mkdir_nonascii(alice_magic_dir) + bob_magic_dir = abspath_expanduser_unicode(u"Bob-magic", base=self.basedir) + self.mkdir_nonascii(bob_magic_dir) + + # Alice creates a Magic Folder, + # invites herself then and joins. + d = self.do_create_magic_folder(0) + d.addCallback(lambda x: self.do_invite(0, u"Alice\u00F8")) + def get_invitecode(result): + self.invitecode = result[1].strip() + d.addCallback(get_invitecode) + d.addCallback(lambda x: self.do_join(0, alice_magic_dir, self.invitecode)) + def get_alice_caps(x): + self.alice_collective_dircap, self.alice_upload_dircap = self.get_caps_from_files(0) + d.addCallback(get_alice_caps) + d.addCallback(lambda x: self.check_joined_config(0, self.alice_upload_dircap)) + d.addCallback(lambda x: self.check_config(0, alice_magic_dir)) + def get_Alice_magicfolder(result): + self.alice_magicfolder = self.init_magicfolder(0, self.alice_upload_dircap, self.alice_collective_dircap, alice_magic_dir, clock) + return result + d.addCallback(get_Alice_magicfolder) + + # Alice invites Bob. Bob joins. + d.addCallback(lambda x: self.do_invite(0, u"Bob\u00F8")) + def get_invitecode(result): + self.invitecode = result[1].strip() + d.addCallback(get_invitecode) + d.addCallback(lambda x: self.do_join(1, bob_magic_dir, self.invitecode)) + def get_bob_caps(x): + self.bob_collective_dircap, self.bob_upload_dircap = self.get_caps_from_files(1) + d.addCallback(get_bob_caps) + d.addCallback(lambda x: self.check_joined_config(1, self.bob_upload_dircap)) + d.addCallback(lambda x: self.check_config(1, bob_magic_dir)) + def get_Bob_magicfolder(result): + self.bob_magicfolder = self.init_magicfolder(1, self.bob_upload_dircap, self.bob_collective_dircap, bob_magic_dir, clock) + return result + d.addCallback(get_Bob_magicfolder) + + def prepare_result(result): + # XXX improve this + return (self.alice_collective_dircap, self.alice_upload_dircap, self.alice_magicfolder, + self.bob_collective_dircap, self.bob_upload_dircap, self.bob_magicfolder) + d.addCallback(prepare_result) + return d + + +class CreateMagicFolder(MagicFolderCLITestMixin, unittest.TestCase): + + def test_create_and_then_invite_join(self): + self.basedir = "cli/MagicFolder/create-and-then-invite-join" + self.set_up_grid() + self.local_dir = os.path.join(self.basedir, "magic") + d = self.do_create_magic_folder(0) + d.addCallback(lambda x: self.do_invite(0, u"Alice")) + def get_invite((rc,stdout,stderr)): + self.invite_code = stdout.strip() + d.addCallback(get_invite) + d.addCallback(lambda x: self.do_join(0, self.local_dir, self.invite_code)) + def get_caps(x): + self.collective_dircap, self.upload_dircap = self.get_caps_from_files(0) + d.addCallback(get_caps) + d.addCallback(lambda x: self.check_joined_config(0, self.upload_dircap)) + d.addCallback(lambda x: self.check_config(0, self.local_dir)) + return d + + def test_create_error(self): + self.basedir = "cli/MagicFolder/create-error" + self.set_up_grid() + self.local_dir = os.path.join(self.basedir, "magic") + d = self.do_cli("magic-folder", "create", "m a g i c:", client_num=0) + def _done((rc,stdout,stderr)): + self.failIfEqual(rc, 0) + self.failUnlessIn("Alias names cannot contain spaces.", stderr) + d.addCallback(_done) + return d + + def test_create_invite_join(self): + self.basedir = "cli/MagicFolder/create-invite-join" + self.set_up_grid() + self.local_dir = os.path.join(self.basedir, "magic") + d = self.do_cli("magic-folder", "create", u"magic:", u"Alice", self.local_dir) + def _done((rc,stdout,stderr)): + self.failUnless(rc == 0) + return (rc,stdout,stderr) + d.addCallback(_done) + def get_caps(x): + self.collective_dircap, self.upload_dircap = self.get_caps_from_files(0) + d.addCallback(get_caps) + d.addCallback(lambda x: self.check_joined_config(0, self.upload_dircap)) + d.addCallback(lambda x: self.check_config(0, self.local_dir)) + return d diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index 23798f62..adf87420 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -318,15 +318,19 @@ class Basic(testutil.ReallyEqualMixin, testutil.NonASCIIPathMixin, unittest.Test class MockMagicFolder(service.MultiService): name = 'magic-folder' - def __init__(self, client, upload_dircap, local_dir, dbfile, inotify=None, + def __init__(self, client, upload_dircap, collective_dircap, local_dir, dbfile, inotify=None, pending_delay=1.0): service.MultiService.__init__(self) self.client = client self.upload_dircap = upload_dircap + self.collective_dircap = collective_dircap self.local_dir = local_dir self.dbfile = dbfile self.inotify = inotify + def ready(self): + pass + self.patch(allmydata.frontends.magic_folder, 'MagicFolder', MockMagicFolder) upload_dircap = "URI:DIR2:blah" @@ -340,12 +344,14 @@ class Basic(testutil.ReallyEqualMixin, testutil.NonASCIIPathMixin, unittest.Test basedir1 = "test_client.Basic.test_create_magic_folder_service1" os.mkdir(basedir1) + fileutil.write(os.path.join(basedir1, "tahoe.cfg"), config + "local.directory = " + local_dir_utf8 + "\n") self.failUnlessRaises(MissingConfigEntry, client.Client, basedir1) fileutil.write(os.path.join(basedir1, "tahoe.cfg"), config) fileutil.write(os.path.join(basedir1, "private", "magic_folder_dircap"), "URI:DIR2:blah") + fileutil.write(os.path.join(basedir1, "private", "collective_dircap"), "URI:DIR2:meow") self.failUnlessRaises(MissingConfigEntry, client.Client, basedir1) fileutil.write(os.path.join(basedir1, "tahoe.cfg"), @@ -365,15 +371,11 @@ class Basic(testutil.ReallyEqualMixin, testutil.NonASCIIPathMixin, unittest.Test class Boom(Exception): pass - def BoomMagicFolder(client, upload_dircap, local_dir_utf8, inotify=None): + def BoomMagicFolder(client, upload_dircap, collective_dircap, local_dir, dbfile, + inotify=None, pending_delay=1.0): raise Boom() self.patch(allmydata.frontends.magic_folder, 'MagicFolder', BoomMagicFolder) - logged_messages = [] - def mock_log(*args, **kwargs): - logged_messages.append("%r %r" % (args, kwargs)) - self.patch(allmydata.util.log, 'msg', mock_log) - basedir2 = "test_client.Basic.test_create_magic_folder_service2" os.mkdir(basedir2) os.mkdir(os.path.join(basedir2, "private")) @@ -383,10 +385,8 @@ class Basic(testutil.ReallyEqualMixin, testutil.NonASCIIPathMixin, unittest.Test "enabled = true\n" + "local.directory = " + local_dir_utf8 + "\n") fileutil.write(os.path.join(basedir2, "private", "magic_folder_dircap"), "URI:DIR2:blah") - c2 = client.Client(basedir2) - self.failUnlessRaises(KeyError, c2.getServiceNamed, 'magic-folder') - self.failUnless([True for arg in logged_messages if "Boom" in arg], - logged_messages) + fileutil.write(os.path.join(basedir2, "private", "collective_dircap"), "URI:DIR2:meow") + self.failUnlessRaises(Boom, client.Client, basedir2) def flush_but_dont_ignore(res): diff --git a/src/allmydata/test/test_magic_folder.py b/src/allmydata/test/test_magic_folder.py index 891f226a..e0396334 100644 --- a/src/allmydata/test/test_magic_folder.py +++ b/src/allmydata/test/test_magic_folder.py @@ -2,10 +2,9 @@ import os, sys from twisted.trial import unittest -from twisted.python import runtime from twisted.internet import defer -from allmydata.interfaces import IDirectoryNode, NoSuchChildError +from allmydata.interfaces import IDirectoryNode from allmydata.util import fake_inotify, fileutil from allmydata.util.encodingutil import get_filesystem_encoding, to_filepath @@ -13,12 +12,15 @@ from allmydata.util.consumer import download_to_data from allmydata.test.no_network import GridTestMixin from allmydata.test.common_util import ReallyEqualMixin, NonASCIIPathMixin from allmydata.test.common import ShouldFailMixin +from .test_cli_magic_folder import MagicFolderCLITestMixin -from allmydata.frontends.magic_folder import MagicFolder +from allmydata.frontends import magic_folder +from allmydata.frontends.magic_folder import MagicFolder, Downloader +from allmydata import backupdb, magicpath from allmydata.util.fileutil import abspath_expanduser_unicode -class MagicFolderTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, NonASCIIPathMixin): +class MagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, ReallyEqualMixin, NonASCIIPathMixin): """ These tests will be run both with a mock notifier, and (on platforms that support it) with the real INotify. @@ -28,66 +30,240 @@ class MagicFolderTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, Non GridTestMixin.setUp(self) temp = self.mktemp() self.basedir = abspath_expanduser_unicode(temp.decode(get_filesystem_encoding())) - def _get_count(self, name): - return self.stats_provider.get_stats()["counters"].get(name, 0) + self.magicfolder = None + + def _get_count(self, name, client=None): + counters = (client or self.get_client()).stats_provider.get_stats()["counters"] + return counters.get('magic_folder.%s' % (name,), 0) + + def _createdb(self): + dbfile = abspath_expanduser_unicode(u"magicfolderdb.sqlite", base=self.basedir) + bdb = backupdb.get_backupdb(dbfile, create_version=(backupdb.SCHEMA_v3, 3)) + self.failUnless(bdb, "unable to create backupdb from %r" % (dbfile,)) + self.failUnlessEqual(bdb.VERSION, 3) + return bdb + + def _restart_client(self, ign): + #print "_restart_client" + d = self.restart_client() + d.addCallback(self._wait_until_started) + return d - def _test(self): - self.uploader = None + def _wait_until_started(self, ign): + #print "_wait_until_started" + self.magicfolder = self.get_client().getServiceNamed('magic-folder') + return self.magicfolder.ready() + + def test_db_basic(self): + fileutil.make_dirs(self.basedir) + self._createdb() + + def test_db_persistence(self): + """Test that a file upload creates an entry in the database.""" + + fileutil.make_dirs(self.basedir) + db = self._createdb() + + relpath1 = u"myFile1" + pathinfo = fileutil.PathInfo(isdir=False, isfile=True, islink=False, + exists=True, size=1, mtime=123, ctime=456) + db.did_upload_version('URI:LIT:1', relpath1, 0, pathinfo) + + c = db.cursor + c.execute("SELECT size, mtime, ctime" + " FROM local_files" + " WHERE path=?", + (relpath1,)) + row = c.fetchone() + self.failUnlessEqual(row, (pathinfo.size, pathinfo.mtime, pathinfo.ctime)) + + # Second test uses db.is_new_file instead of SQL query directly + # to confirm the previous upload entry in the db. + relpath2 = u"myFile2" + path2 = os.path.join(self.basedir, relpath2) + fileutil.write(path2, "meow\n") + pathinfo = fileutil.get_pathinfo(path2) + db.did_upload_version('URI:LIT:2', relpath2, 0, pathinfo) + self.failUnlessFalse(db.is_new_file(pathinfo, relpath2)) + + different_pathinfo = fileutil.PathInfo(isdir=False, isfile=True, islink=False, + exists=True, size=0, mtime=pathinfo.mtime, ctime=pathinfo.ctime) + self.failUnlessTrue(db.is_new_file(different_pathinfo, relpath2)) + + def test_magicfolder_start_service(self): self.set_up_grid() - self.local_dir = os.path.join(self.basedir, self.unicode_or_fallback(u"loc\u0101l_dir", u"local_dir")) + + self.local_dir = abspath_expanduser_unicode(self.unicode_or_fallback(u"l\u00F8cal_dir", u"local_dir"), + base=self.basedir) self.mkdir_nonascii(self.local_dir) - self.client = self.g.clients[0] - self.stats_provider = self.client.stats_provider + d = defer.succeed(None) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.dirs_monitored'), 0)) - d = self.client.create_dirnode() - def _made_upload_dir(n): - self.failUnless(IDirectoryNode.providedBy(n)) - self.upload_dirnode = n - self.upload_dircap = n.get_uri() - self.uploader = DropUploader(self.client, self.upload_dircap, self.local_dir.encode('utf-8'), - inotify=self.inotify) - return self.uploader.startService() - d.addCallback(_made_upload_dir) + d.addCallback(lambda ign: self.create_invite_join_magic_folder(u"Alice", self.local_dir)) + d.addCallback(self._restart_client) + + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.dirs_monitored'), 1)) + d.addBoth(self.cleanup) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.dirs_monitored'), 0)) + return d + + def test_move_tree(self): + self.set_up_grid() + + self.local_dir = abspath_expanduser_unicode(self.unicode_or_fallback(u"l\u00F8cal_dir", u"local_dir"), + base=self.basedir) + self.mkdir_nonascii(self.local_dir) + + empty_tree_name = self.unicode_or_fallback(u"empty_tr\u00EAe", u"empty_tree") + empty_tree_dir = abspath_expanduser_unicode(empty_tree_name, base=self.basedir) + new_empty_tree_dir = abspath_expanduser_unicode(empty_tree_name, base=self.local_dir) + + small_tree_name = self.unicode_or_fallback(u"small_tr\u00EAe", u"empty_tree") + small_tree_dir = abspath_expanduser_unicode(small_tree_name, base=self.basedir) + new_small_tree_dir = abspath_expanduser_unicode(small_tree_name, base=self.local_dir) + + d = self.create_invite_join_magic_folder(u"Alice", self.local_dir) + d.addCallback(self._restart_client) + + def _check_move_empty_tree(res): + #print "_check_move_empty_tree" + self.mkdir_nonascii(empty_tree_dir) + d2 = self.magicfolder.uploader.set_hook('processed') + os.rename(empty_tree_dir, new_empty_tree_dir) + self.notify(to_filepath(new_empty_tree_dir), self.inotify.IN_MOVED_TO) + return d2 + d.addCallback(_check_move_empty_tree) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 1)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 1)) + + def _check_move_small_tree(res): + #print "_check_move_small_tree" + self.mkdir_nonascii(small_tree_dir) + fileutil.write(abspath_expanduser_unicode(u"what", base=small_tree_dir), "say when") + d2 = self.magicfolder.uploader.set_hook('processed', ignore_count=1) + os.rename(small_tree_dir, new_small_tree_dir) + self.notify(to_filepath(new_small_tree_dir), self.inotify.IN_MOVED_TO) + return d2 + d.addCallback(_check_move_small_tree) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 3)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded'), 1)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 2)) + + def _check_moved_tree_is_watched(res): + #print "_check_moved_tree_is_watched" + d2 = self.magicfolder.uploader.set_hook('processed') + fileutil.write(abspath_expanduser_unicode(u"another", base=new_small_tree_dir), "file") + self.notify(to_filepath(abspath_expanduser_unicode(u"another", base=new_small_tree_dir)), self.inotify.IN_CLOSE_WRITE) + return d2 + d.addCallback(_check_moved_tree_is_watched) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 4)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded'), 2)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 2)) + + # Files that are moved out of the upload directory should no longer be watched. + #def _move_dir_away(ign): + # os.rename(new_empty_tree_dir, empty_tree_dir) + # # Wuh? Why don't we get this event for the real test? + # #self.notify(to_filepath(new_empty_tree_dir), self.inotify.IN_MOVED_FROM) + #d.addCallback(_move_dir_away) + #def create_file(val): + # test_file = abspath_expanduser_unicode(u"what", base=empty_tree_dir) + # fileutil.write(test_file, "meow") + # #self.notify(...) + # return + #d.addCallback(create_file) + #d.addCallback(lambda ign: time.sleep(1)) # XXX ICK + #d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0)) + #d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 4)) + #d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded'), 2)) + #d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0)) + #d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 2)) + + d.addBoth(self.cleanup) + return d + + def test_persistence(self): + """ + Perform an upload of a given file and then stop the client. + Start a new client and magic-folder service... and verify that the file is NOT uploaded + a second time. This test is meant to test the database persistence along with + the startup and shutdown code paths of the magic-folder service. + """ + self.set_up_grid() + self.local_dir = abspath_expanduser_unicode(u"test_persistence", base=self.basedir) + self.mkdir_nonascii(self.local_dir) + self.collective_dircap = "" + + d = defer.succeed(None) + d.addCallback(lambda ign: self.create_invite_join_magic_folder(u"Alice", self.local_dir)) + d.addCallback(self._restart_client) + + def create_test_file(filename): + d2 = self.magicfolder.uploader.set_hook('processed') + test_file = abspath_expanduser_unicode(filename, base=self.local_dir) + fileutil.write(test_file, "meow %s" % filename) + self.notify(to_filepath(test_file), self.inotify.IN_CLOSE_WRITE) + return d2 + d.addCallback(lambda ign: create_test_file(u"what1")) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 1)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0)) + d.addCallback(self.cleanup) + + d.addCallback(self._restart_client) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 1)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0)) + d.addCallback(lambda ign: create_test_file(u"what2")) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 2)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0)) + d.addBoth(self.cleanup) + return d + + def test_magic_folder(self): + self.set_up_grid() + self.local_dir = os.path.join(self.basedir, self.unicode_or_fallback(u"loc\u0101l_dir", u"local_dir")) + self.mkdir_nonascii(self.local_dir) + + d = self.create_invite_join_magic_folder(u"Alice\u0101", self.local_dir) + d.addCallback(self._restart_client) # Write something short enough for a LIT file. - d.addCallback(lambda ign: self._test_file(u"short", "test")) + d.addCallback(lambda ign: self._check_file(u"short", "test")) # Write to the same file again with different data. - d.addCallback(lambda ign: self._test_file(u"short", "different")) + d.addCallback(lambda ign: self._check_file(u"short", "different")) # Test that temporary files are not uploaded. - d.addCallback(lambda ign: self._test_file(u"tempfile", "test", temporary=True)) + d.addCallback(lambda ign: self._check_file(u"tempfile", "test", temporary=True)) # Test that we tolerate creation of a subdirectory. d.addCallback(lambda ign: os.mkdir(os.path.join(self.local_dir, u"directory"))) # Write something longer, and also try to test a Unicode name if the fs can represent it. name_u = self.unicode_or_fallback(u"l\u00F8ng", u"long") - d.addCallback(lambda ign: self._test_file(name_u, "test"*100)) + d.addCallback(lambda ign: self._check_file(name_u, "test"*100)) # TODO: test that causes an upload failure. - d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_failed'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0)) - # Prevent unclean reactor errors. - def _cleanup(res): - d = defer.succeed(None) - if self.uploader is not None: - d.addCallback(lambda ign: self.uploader.finish(for_tests=True)) - d.addCallback(lambda ign: res) - return d - d.addBoth(_cleanup) + d.addBoth(self.cleanup) return d - def _test_file(self, name_u, data, temporary=False): - previously_uploaded = self._get_count('drop_upload.files_uploaded') - previously_disappeared = self._get_count('drop_upload.files_disappeared') - - d = defer.Deferred() + def _check_file(self, name_u, data, temporary=False): + previously_uploaded = self._get_count('uploader.objects_succeeded') + previously_disappeared = self._get_count('uploader.objects_disappeared') - # Note: this relies on the fact that we only get one IN_CLOSE_WRITE notification per file - # (otherwise we would get a defer.AlreadyCalledError). Should we be relying on that? - self.uploader.set_uploaded_callback(d.callback) + d = self.magicfolder.uploader.set_hook('processed') path_u = abspath_expanduser_unicode(name_u, base=self.local_dir) path = to_filepath(path_u) @@ -103,28 +279,150 @@ class MagicFolderTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, Non f.close() if temporary and sys.platform == "win32": os.unlink(path_u) + self.notify(path, self.inotify.IN_DELETE) fileutil.flush_volume(path_u) - self.notify_close_write(path) + self.notify(path, self.inotify.IN_CLOSE_WRITE) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0)) if temporary: - d.addCallback(lambda ign: self.shouldFail(NoSuchChildError, 'temp file not uploaded', None, - self.upload_dirnode.get, name_u)) - d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_disappeared'), + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_disappeared'), previously_disappeared + 1)) else: d.addCallback(lambda ign: self.upload_dirnode.get(name_u)) d.addCallback(download_to_data) d.addCallback(lambda actual_data: self.failUnlessReallyEqual(actual_data, data)) - d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'), + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), previously_uploaded + 1)) - d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_queued'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0)) + return d + + def _check_version_in_dmd(self, magicfolder, relpath_u, expected_version): + encoded_name_u = magicpath.path2magic(relpath_u) + d = magicfolder.downloader._get_collective_latest_file(encoded_name_u) + def check_latest(result): + if result[0] is not None: + node, metadata = result + d.addCallback(lambda ign: self.failUnlessEqual(metadata['version'], expected_version)) + d.addCallback(check_latest) return d + def _check_version_in_local_db(self, magicfolder, relpath_u, expected_version): + version = magicfolder._db.get_local_file_version(relpath_u) + #print "_check_version_in_local_db: %r has version %s" % (relpath_u, version) + self.failUnlessEqual(version, expected_version) + + def test_alice_bob(self): + d = self.setup_alice_and_bob() + def get_results(result): + # XXX are these used? + (self.alice_collective_dircap, self.alice_upload_dircap, self.alice_magicfolder, + self.bob_collective_dircap, self.bob_upload_dircap, self.bob_magicfolder) = result + #print "Alice magicfolderdb is at %r" % (self.alice_magicfolder._client.basedir) + #print "Bob magicfolderdb is at %r" % (self.bob_magicfolder._client.basedir) + d.addCallback(get_results) + + def Alice_write_a_file(result): + print "Alice writes a file\n" + self.file_path = abspath_expanduser_unicode(u"file1", base=self.alice_magicfolder.uploader._local_path_u) + fileutil.write(self.file_path, "meow, meow meow. meow? meow meow! meow.") + self.magicfolder = self.alice_magicfolder + self.notify(to_filepath(self.file_path), self.inotify.IN_CLOSE_WRITE) + + d.addCallback(Alice_write_a_file) + + def Alice_wait_for_upload(result): + print "Alice waits for an upload\n" + d2 = self.alice_magicfolder.uploader.set_hook('processed') + return d2 + d.addCallback(Alice_wait_for_upload) + d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 0)) + d.addCallback(lambda ign: self._check_version_in_local_db(self.alice_magicfolder, u"file1", 0)) + + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded', client=self.alice_magicfolder._client), 1)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded', client=self.alice_magicfolder._client), 1)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued', client=self.alice_magicfolder._client), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created', client=self.alice_magicfolder._client), 0)) + + def Bob_wait_for_download(result): + print "Bob waits for a download\n" + d2 = self.bob_magicfolder.downloader.set_hook('processed') + return d2 + d.addCallback(Bob_wait_for_download) + d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 0)) + d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file1", 0)) # XXX prolly not needed + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_failed'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_downloaded', client=self.bob_magicfolder._client), 1)) + + + # test deletion of file behavior + def Alice_delete_file(result): + print "Alice deletes the file!\n" + os.unlink(self.file_path) + self.notify(to_filepath(self.file_path), self.inotify.IN_DELETE) + + return None + d.addCallback(Alice_delete_file) + d.addCallback(Alice_wait_for_upload) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded', client=self.alice_magicfolder._client), 2)) + d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 1)) + d.addCallback(lambda ign: self._check_version_in_local_db(self.alice_magicfolder, u"file1", 1)) + + d.addCallback(Bob_wait_for_download) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_failed'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_downloaded', client=self.bob_magicfolder._client), 2)) + d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 1)) + d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file1", 1)) + + def Alice_rewrite_file(result): + print "Alice rewrites file\n" + self.file_path = abspath_expanduser_unicode(u"file1", base=self.alice_magicfolder.uploader._local_path_u) + fileutil.write(self.file_path, "Alice suddenly sees the white rabbit running into the forest.") + self.magicfolder = self.alice_magicfolder + self.notify(to_filepath(self.file_path), self.inotify.IN_CLOSE_WRITE) + + d.addCallback(Alice_rewrite_file) + d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 1)) + d.addCallback(Alice_wait_for_upload) + + d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 2)) + d.addCallback(lambda ign: self._check_version_in_local_db(self.alice_magicfolder, u"file1", 2)) + + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded', client=self.alice_magicfolder._client), 3)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded', client=self.alice_magicfolder._client), 3)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued', client=self.alice_magicfolder._client), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created', client=self.alice_magicfolder._client), 0)) + + d.addCallback(Bob_wait_for_download) + d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file1", 2)) + d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 2)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_failed'), 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_downloaded', client=self.bob_magicfolder._client), 3)) + + def cleanup_Alice_and_Bob(result): + print "cleanup alice bob test\n" + d = defer.succeed(None) + d.addCallback(lambda ign: self.alice_magicfolder.finish()) + d.addCallback(lambda ign: self.bob_magicfolder.finish()) + d.addCallback(lambda ign: result) + return d + d.addCallback(cleanup_Alice_and_Bob) + return d class MockTest(MagicFolderTestMixin, unittest.TestCase): """This can run on any platform, and even if twisted.internet.inotify can't be imported.""" + def setUp(self): + MagicFolderTestMixin.setUp(self) + self.inotify = fake_inotify + self.patch(magic_folder, 'get_inotify_module', lambda: self.inotify) + + def notify(self, path, mask): + self.magicfolder.uploader._notifier.event(path, mask) + def test_errors(self): self.set_up_grid() @@ -137,44 +435,78 @@ class MockTest(MagicFolderTestMixin, unittest.TestCase): client = self.g.clients[0] d = client.create_dirnode() - def _made_upload_dir(n): + def _check_errors(n): self.failUnless(IDirectoryNode.providedBy(n)) upload_dircap = n.get_uri() readonly_dircap = n.get_readonly_uri() self.shouldFail(AssertionError, 'nonexistent local.directory', 'there is no directory', - MagicFolder, client, upload_dircap, doesnotexist, inotify=fake_inotify) + MagicFolder, client, upload_dircap, '', doesnotexist, magicfolderdb) self.shouldFail(AssertionError, 'non-directory local.directory', 'is not a directory', - MagicFolder, client, upload_dircap, not_a_dir, inotify=fake_inotify) + MagicFolder, client, upload_dircap, '', not_a_dir, magicfolderdb) self.shouldFail(AssertionError, 'bad upload.dircap', 'does not refer to a directory', - MagicFolder, client, 'bad', errors_dir, inotify=fake_inotify) + MagicFolder, client, 'bad', '', errors_dir, magicfolderdb) self.shouldFail(AssertionError, 'non-directory upload.dircap', 'does not refer to a directory', - MagicFolder, client, 'URI:LIT:foo', errors_dir, inotify=fake_inotify) + MagicFolder, client, 'URI:LIT:foo', '', errors_dir, magicfolderdb) self.shouldFail(AssertionError, 'readonly upload.dircap', 'is not a writecap to a directory', - MagicFolder, client, readonly_dircap, errors_dir, inotify=fake_inotify) - d.addCallback(_made_upload_dir) + MagicFolder, client, readonly_dircap, '', errors_dir, magicfolderdb,) + self.shouldFail(AssertionError, 'collective dircap', + "The URI in 'private/collective_dircap' is not a readonly cap to a directory.", + MagicFolder, client, upload_dircap, upload_dircap, errors_dir, magicfolderdb) + + def _not_implemented(): + raise NotImplementedError("blah") + self.patch(magic_folder, 'get_inotify_module', _not_implemented) + self.shouldFail(NotImplementedError, 'unsupported', 'blah', + MagicFolder, client, upload_dircap, '', errors_dir, magicfolderdb) + d.addCallback(_check_errors) return d - def test_drop_upload(self): - self.inotify = fake_inotify - self.basedir = "drop_upload.MockTest.test_drop_upload" - return self._test() + def test_write_downloaded_file(self): + workdir = u"cli/MagicFolder/write-downloaded-file" + local_file = fileutil.abspath_expanduser_unicode(os.path.join(workdir, "foobar")) + + # create a file with name "foobar" with content "foo" + # write downloaded file content "bar" into "foobar" with is_conflict = False + fileutil.make_dirs(workdir) + fileutil.write(local_file, "foo") + + # if is_conflict is False, then the .conflict file shouldn't exist. + Downloader._write_downloaded_file(local_file, "bar", False, None) + conflicted_path = local_file + u".conflict" + self.failIf(os.path.exists(conflicted_path)) - def notify_close_write(self, path): - self.uploader._notifier.event(path, self.inotify.IN_CLOSE_WRITE) + # At this point, the backup file should exist with content "foo" + backup_path = local_file + u".backup" + self.failUnless(os.path.exists(backup_path)) + self.failUnlessEqual(fileutil.read(backup_path), "foo") + + # .tmp file shouldn't exist + self.failIf(os.path.exists(local_file + u".tmp")) + + # .. and the original file should have the new content + self.failUnlessEqual(fileutil.read(local_file), "bar") + + # now a test for conflicted case + Downloader._write_downloaded_file(local_file, "bar", True, None) + self.failUnless(os.path.exists(conflicted_path)) + + # .tmp file shouldn't exist + self.failIf(os.path.exists(local_file + u".tmp")) class RealTest(MagicFolderTestMixin, unittest.TestCase): """This is skipped unless both Twisted and the platform support inotify.""" - def test_drop_upload(self): - self.inotify = None # use the appropriate inotify for the platform - self.basedir = "drop_upload.RealTest.test_drop_upload" - return self._test() + def setUp(self): + MagicFolderTestMixin.setUp(self) + self.inotify = magic_folder.get_inotify_module() - def notify_close_write(self, path): - # Writing to the file causes the notification. + def notify(self, path, mask): + # Writing to the filesystem causes the notification. pass -if sys.platform != "win32" and not runtime.platform.supportsINotify(): +try: + magic_folder.get_inotify_module() +except NotImplementedError: RealTest.skip = "Magic Folder support can only be tested for-real on an OS that supports inotify or equivalent." diff --git a/src/allmydata/test/test_magicpath.py b/src/allmydata/test/test_magicpath.py new file mode 100644 index 00000000..1227a2c4 --- /dev/null +++ b/src/allmydata/test/test_magicpath.py @@ -0,0 +1,28 @@ + +from twisted.trial import unittest + +from allmydata import magicpath + + +class MagicPath(unittest.TestCase): + tests = { + u"Documents/work/critical-project/qed.txt": u"Documents@_work@_critical-project@_qed.txt", + u"Documents/emails/bunnyfufu@hoppingforest.net": u"Documents@_emails@_bunnyfufu@@hoppingforest.net", + u"foo/@/bar": u"foo@_@@@_bar", + } + + def test_path2magic(self): + for test, expected in self.tests.items(): + self.failUnlessEqual(magicpath.path2magic(test), expected) + + def test_magic2path(self): + for expected, test in self.tests.items(): + self.failUnlessEqual(magicpath.magic2path(test), expected) + + def test_should_ignore(self): + self.failUnlessEqual(magicpath.should_ignore_file(u".bashrc"), True) + self.failUnlessEqual(magicpath.should_ignore_file(u"bashrc."), False) + self.failUnlessEqual(magicpath.should_ignore_file(u"forest/tree/branch/.bashrc"), True) + self.failUnlessEqual(magicpath.should_ignore_file(u"forest/tree/.branch/bashrc"), True) + self.failUnlessEqual(magicpath.should_ignore_file(u"forest/.tree/branch/bashrc"), True) + self.failUnlessEqual(magicpath.should_ignore_file(u"forest/tree/branch/bashrc"), False) diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py index db64bf19..4b7354e9 100644 --- a/src/allmydata/test/test_util.py +++ b/src/allmydata/test/test_util.py @@ -441,6 +441,74 @@ class FileUtil(ReallyEqualMixin, unittest.TestCase): self.failIf(os.path.exists(fn)) self.failUnless(os.path.exists(fn2)) + def test_rename_no_overwrite(self): + workdir = fileutil.abspath_expanduser_unicode(u"test_rename_no_overwrite") + fileutil.make_dirs(workdir) + + source_path = os.path.join(workdir, "source") + dest_path = os.path.join(workdir, "dest") + + # when neither file exists + self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path) + + # when only dest exists + fileutil.write(dest_path, "dest") + self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path) + self.failUnlessEqual(fileutil.read(dest_path), "dest") + + # when both exist + fileutil.write(source_path, "source") + self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path) + self.failUnlessEqual(fileutil.read(source_path), "source") + self.failUnlessEqual(fileutil.read(dest_path), "dest") + + # when only source exists + os.remove(dest_path) + fileutil.rename_no_overwrite(source_path, dest_path) + self.failUnlessEqual(fileutil.read(dest_path), "source") + self.failIf(os.path.exists(source_path)) + + def test_replace_file(self): + workdir = fileutil.abspath_expanduser_unicode(u"test_replace_file") + fileutil.make_dirs(workdir) + + backup_path = os.path.join(workdir, "backup") + replaced_path = os.path.join(workdir, "replaced") + replacement_path = os.path.join(workdir, "replacement") + + # when none of the files exist + self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path, backup_path) + + # when only replaced exists + fileutil.write(replaced_path, "foo") + self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path, backup_path) + self.failUnlessEqual(fileutil.read(replaced_path), "foo") + + # when both replaced and replacement exist, but not backup + fileutil.write(replacement_path, "bar") + fileutil.replace_file(replaced_path, replacement_path, backup_path) + self.failUnlessEqual(fileutil.read(backup_path), "foo") + self.failUnlessEqual(fileutil.read(replaced_path), "bar") + self.failIf(os.path.exists(replacement_path)) + + # when only replacement exists + os.remove(backup_path) + os.remove(replaced_path) + fileutil.write(replacement_path, "bar") + fileutil.replace_file(replaced_path, replacement_path, backup_path) + self.failUnlessEqual(fileutil.read(replaced_path), "bar") + self.failIf(os.path.exists(replacement_path)) + self.failIf(os.path.exists(backup_path)) + + # when replaced, replacement and backup all exist + fileutil.write(replaced_path, "foo") + fileutil.write(replacement_path, "bar") + fileutil.write(backup_path, "bak") + fileutil.replace_file(replaced_path, replacement_path, backup_path) + self.failUnlessEqual(fileutil.read(backup_path), "foo") + self.failUnlessEqual(fileutil.read(replaced_path), "bar") + self.failIf(os.path.exists(replacement_path)) + def test_du(self): basedir = "util/FileUtil/test_du" fileutil.make_dirs(basedir) @@ -567,6 +635,50 @@ class FileUtil(ReallyEqualMixin, unittest.TestCase): disk = fileutil.get_disk_stats('.', 2**128) self.failUnlessEqual(disk['avail'], 0) + def test_get_pathinfo(self): + basedir = "util/FileUtil/test_get_pathinfo" + fileutil.make_dirs(basedir) + + # create a directory + self.mkdir(basedir, "a") + dirinfo = fileutil.get_pathinfo(basedir) + self.failUnlessTrue(dirinfo.isdir) + self.failUnlessTrue(dirinfo.exists) + self.failUnlessFalse(dirinfo.isfile) + self.failUnlessFalse(dirinfo.islink) + + # create a file under the directory + f = os.path.join(basedir, "a", "1.txt") + self.touch(basedir, "a/1.txt", data="a"*10) + fileinfo = fileutil.get_pathinfo(f) + self.failUnlessTrue(fileinfo.isfile) + self.failUnlessTrue(fileinfo.exists) + self.failUnlessFalse(fileinfo.isdir) + self.failUnlessFalse(fileinfo.islink) + self.failUnlessEqual(fileinfo.size, 10) + + # create a symlink under the directory a pointing to 1.txt + slname = os.path.join(basedir, "a", "linkto1.txt") + os.symlink(f, slname) + symlinkinfo = fileutil.get_pathinfo(slname) + self.failUnlessTrue(symlinkinfo.islink) + self.failUnlessTrue(symlinkinfo.exists) + self.failUnlessFalse(symlinkinfo.isfile) + self.failUnlessFalse(symlinkinfo.isdir) + + # path at which nothing exists + dnename = os.path.join(basedir, "a", "doesnotexist") + now = time.time() + dneinfo = fileutil.get_pathinfo(dnename, now=now) + self.failUnlessFalse(dneinfo.exists) + self.failUnlessFalse(dneinfo.isfile) + self.failUnlessFalse(dneinfo.isdir) + self.failUnlessFalse(dneinfo.islink) + self.failUnlessEqual(dneinfo.size, None) + self.failUnlessEqual(dneinfo.mtime, now) + self.failUnlessEqual(dneinfo.ctime, now) + + class PollMixinTests(unittest.TestCase): def setUp(self): self.pm = pollmixin.PollMixin() diff --git a/src/allmydata/util/deferredutil.py b/src/allmydata/util/deferredutil.py index 989e85e8..9b37cb27 100644 --- a/src/allmydata/util/deferredutil.py +++ b/src/allmydata/util/deferredutil.py @@ -5,6 +5,7 @@ from foolscap.api import eventually, fireEventually from twisted.internet import defer, reactor from allmydata.util import log +from allmydata.util.assertutil import _assert from allmydata.util.pollmixin import PollMixin @@ -77,28 +78,35 @@ class HookMixin: I am a helper mixin that maintains a collection of named hooks, primarily for use in tests. Each hook is set to an unfired Deferred using 'set_hook', and can then be fired exactly once at the appropriate time by '_call_hook'. + If 'ignore_count' is given, that number of calls to '_call_hook' will be + ignored before firing the hook. I assume a '_hooks' attribute that should set by the class constructor to a dict mapping each valid hook name to None. """ - def set_hook(self, name, d=None): + def set_hook(self, name, d=None, ignore_count=0): """ Called by the hook observer (e.g. by a test). If d is not given, an unfired Deferred is created and returned. The hook must not already be set. """ + self._log("set_hook %r, ignore_count=%r" % (name, ignore_count)) if d is None: d = defer.Deferred() - assert self._hooks[name] is None, self._hooks[name] - assert isinstance(d, defer.Deferred), d - self._hooks[name] = d + _assert(ignore_count >= 0, ignore_count=ignore_count) + _assert(name in self._hooks, name=name) + _assert(self._hooks[name] is None, name=name, hook=self._hooks[name]) + _assert(isinstance(d, defer.Deferred), d=d) + + self._hooks[name] = (d, ignore_count) return d def _call_hook(self, res, name): """ - Called to trigger the hook, with argument 'res'. This is a no-op if the - hook is unset. Otherwise, the hook will be unset, and then its Deferred - will be fired synchronously. + Called to trigger the hook, with argument 'res'. This is a no-op if + the hook is unset. If the hook's ignore_count is positive, it will be + decremented; if it was already zero, the hook will be unset, and then + its Deferred will be fired synchronously. The expected usage is "deferred.addBoth(self._call_hook, 'hookname')". This ensures that if 'res' is a failure, the hook will be errbacked, @@ -106,13 +114,22 @@ class HookMixin: 'res' is returned so that the current result or failure will be passed through. """ - d = self._hooks[name] - if d is None: + hook = self._hooks[name] + if hook is None: return defer.succeed(None) - self._hooks[name] = None - _with_log(d.callback, res) + + (d, ignore_count) = hook + self._log("call_hook %r, ignore_count=%r" % (name, ignore_count)) + if ignore_count > 0: + self._hooks[name] = (d, ignore_count - 1) + else: + self._hooks[name] = None + _with_log(d.callback, res) return res + def _log(self, msg): + log.msg(msg, level=log.NOISY) + def async_iterate(process, iterable, *extra_args, **kwargs): """ diff --git a/src/allmydata/util/encodingutil.py b/src/allmydata/util/encodingutil.py index ae54afb9..a7a2bfef 100644 --- a/src/allmydata/util/encodingutil.py +++ b/src/allmydata/util/encodingutil.py @@ -6,7 +6,7 @@ unicode and back. import sys, os, re, locale from types import NoneType -from allmydata.util.assertutil import precondition +from allmydata.util.assertutil import precondition, _assert from twisted.python import usage from twisted.python.filepath import FilePath from allmydata.util import log @@ -63,7 +63,11 @@ def _reload(): is_unicode_platform = sys.platform in ["win32", "darwin"] - use_unicode_filepath = sys.platform == "win32" or hasattr(FilePath, '_asTextPath') + # Despite the Unicode-mode FilePath support added to Twisted in + # , we can't yet use + # Unicode-mode FilePaths with INotify on non-Windows platforms + # due to . + use_unicode_filepath = sys.platform == "win32" _reload() @@ -249,6 +253,22 @@ def quote_local_unicode_path(path, quotemarks=True): return quote_output(path, quotemarks=quotemarks, quote_newlines=True) +def quote_filepath(path, quotemarks=True): + return quote_local_unicode_path(unicode_from_filepath(path), quotemarks=quotemarks) + +def extend_filepath(fp, segments): + # We cannot use FilePath.preauthChild, because + # * it has the security flaw described in ; + # * it may return a FilePath in the wrong mode. + + for segment in segments: + fp = fp.child(segment) + + if isinstance(fp.path, unicode) and not use_unicode_filepath: + return FilePath(fp.path.encode(filesystem_encoding)) + else: + return fp + def to_filepath(path): precondition(isinstance(path, basestring), path=path) @@ -257,15 +277,28 @@ def to_filepath(path): return FilePath(path) +def _decode(s): + precondition(isinstance(s, basestring), s=s) + + if isinstance(s, bytes): + return s.decode(filesystem_encoding) + else: + return s + def unicode_from_filepath(fp): precondition(isinstance(fp, FilePath), fp=fp) + return _decode(fp.path) - path = fp.path - if isinstance(path, bytes): - path = path.decode(filesystem_encoding) - - return path +def unicode_segments_from(base_fp, ancestor_fp): + precondition(isinstance(base_fp, FilePath), base_fp=base_fp) + precondition(isinstance(ancestor_fp, FilePath), ancestor_fp=ancestor_fp) + if hasattr(FilePath, 'asTextMode'): + return base_fp.asTextMode().segmentsFrom(ancestor_fp.asTextMode()) + else: + bpt, apt = (type(base_fp.path), type(ancestor_fp.path)) + _assert(bpt == apt, bpt=bpt, apt=apt) + return map(_decode, base_fp.segmentsFrom(ancestor_fp)) def unicode_platform(): """ @@ -313,3 +346,6 @@ def listdir_unicode(path): return os.listdir(path) else: return listdir_unicode_fallback(path) + +def listdir_filepath(fp): + return listdir_unicode(unicode_from_filepath(fp)) diff --git a/src/allmydata/util/fileutil.py b/src/allmydata/util/fileutil.py index 54e34484..8194fcd4 100644 --- a/src/allmydata/util/fileutil.py +++ b/src/allmydata/util/fileutil.py @@ -3,6 +3,8 @@ Futz with files like a pro. """ import sys, exceptions, os, stat, tempfile, time, binascii +from collections import namedtuple +from errno import ENOENT from twisted.python import log @@ -518,8 +520,7 @@ def get_available_space(whichdir, reserved_space): if sys.platform == "win32": - from ctypes import WINFUNCTYPE, windll, WinError - from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPVOID + from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPVOID, WinError, get_last_error # CreateFileW = WINFUNCTYPE(HANDLE, LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE) \ @@ -560,3 +561,98 @@ else: def flush_volume(path): # use sync()? pass + + +class ConflictError(Exception): + pass + +class UnableToUnlinkReplacementError(Exception): + pass + +def reraise(wrapper): + _, exc, tb = sys.exc_info() + wrapper_exc = wrapper("%s: %s" % (exc.__class__.__name__, exc)) + raise wrapper_exc.__class__, wrapper_exc, tb + +if sys.platform == "win32": + from ctypes import WINFUNCTYPE, windll, WinError, get_last_error + from ctypes.wintypes import BOOL, DWORD, LPCWSTR, LPVOID + + # + ReplaceFileW = WINFUNCTYPE( + BOOL, + LPCWSTR, LPCWSTR, LPCWSTR, DWORD, LPVOID, LPVOID, + use_last_error=True + )(("ReplaceFileW", windll.kernel32)) + + REPLACEFILE_IGNORE_MERGE_ERRORS = 0x00000002 + + def rename_no_overwrite(source_path, dest_path): + os.rename(source_path, dest_path) + + def replace_file(replaced_path, replacement_path, backup_path): + precondition_abspath(replaced_path) + precondition_abspath(replacement_path) + precondition_abspath(backup_path) + + r = ReplaceFileW(replaced_path, replacement_path, backup_path, + REPLACEFILE_IGNORE_MERGE_ERRORS, None, None) + if r == 0: + # The UnableToUnlinkReplacementError case does not happen on Windows; + # all errors should be treated as signalling a conflict. + err = get_last_error() + raise ConflictError("WinError: %s" % (WinError(err))) +else: + def rename_no_overwrite(source_path, dest_path): + # link will fail with EEXIST if there is already something at dest_path. + os.link(source_path, dest_path) + try: + os.unlink(source_path) + except EnvironmentError: + reraise(UnableToUnlinkReplacementError) + + def replace_file(replaced_path, replacement_path, backup_path): + precondition_abspath(replaced_path) + precondition_abspath(replacement_path) + precondition_abspath(backup_path) + + if not os.path.exists(replacement_path): + raise ConflictError("Replacement file not found: %r" % (replacement_path,)) + + try: + os.rename(replaced_path, backup_path) + except OSError as e: + if e.errno != ENOENT: + raise + try: + rename_no_overwrite(replacement_path, replaced_path) + except EnvironmentError: + reraise(ConflictError) + +PathInfo = namedtuple('PathInfo', 'isdir isfile islink exists size mtime ctime') + +def get_pathinfo(path_u, now=None): + try: + statinfo = os.lstat(path_u) + mode = statinfo.st_mode + return PathInfo(isdir =stat.S_ISDIR(mode), + isfile=stat.S_ISREG(mode), + islink=stat.S_ISLNK(mode), + exists=True, + size =statinfo.st_size, + mtime =statinfo.st_mtime, + ctime =statinfo.st_ctime, + ) + except OSError as e: + if e.errno == ENOENT: + if now is None: + now = time.time() + return PathInfo(isdir =False, + isfile=False, + islink=False, + exists=False, + size =None, + mtime =now, + ctime =now, + ) + raise