CREATE TABLE local_files
(
path VARCHAR(1024) PRIMARY KEY, -- index, this is an absolute UTF-8-encoded local filename
- size INTEGER, -- os.stat(fn)[stat.ST_SIZE]
+ -- note that size is before mtime and ctime here, but after in function parameters
+ size INTEGER, -- os.stat(fn)[stat.ST_SIZE] (NULL if the file has been deleted)
mtime NUMBER, -- os.stat(fn)[stat.ST_MTIME]
ctime NUMBER, -- os.stat(fn)[stat.ST_CTIME]
fileid INTEGER%s
is not healthy, please upload the file and call r.did_upload(filecap)
when you're done.
- If use_timestamps=True (the default), I will compare ctime and mtime
+ If use_timestamps=True (the default), I will compare mtime and ctime
of the local file against an entry in my database, and consider the
- file to be unchanged if ctime, mtime, and filesize are all the same
+ file to be unchanged if mtime, ctime, and filesize are all the same
as the earlier version. If use_timestamps=False, I will not trust the
timestamps, so more files (perhaps all) will be marked as needing
upload. A future version of this database may hash the file to make
"""
path = abspath_expanduser_unicode(path)
+
+ # XXX consider using get_pathinfo
s = os.stat(path)
size = s[stat.ST_SIZE]
- ctime = s[stat.ST_CTIME]
mtime = s[stat.ST_MTIME]
+ ctime = s[stat.ST_CTIME]
now = time.time()
c = self.cursor
class MagicFolderDB(BackupDB):
VERSION = 3
- def get_all_files(self):
- """Retreive a list of all files that have had an entry in magic-folder db
- (files that have been downloaded at least once).
+ def get_all_relpaths(self):
+ """
+ Retrieve a list of all relpaths of files that have had an entry in magic folder db
+ (i.e. that have been downloaded at least once).
"""
self.cursor.execute("SELECT path FROM local_files")
rows = self.cursor.fetchall()
- if not rows:
- return None
- else:
- return rows
+ return set([r[0] for r in rows])
- def get_local_file_version(self, path):
- """I will tell you the version of a local file tracked by our magic folder db.
- If no db entry found then I'll return None.
+ def get_local_file_version(self, relpath_u):
+ """
+ Return the version of a local file tracked by our magic folder db.
+ If no db entry is found then return None.
"""
c = self.cursor
c.execute("SELECT version, fileid"
" FROM local_files"
" WHERE path=?",
- (path,))
+ (relpath_u,))
row = self.cursor.fetchone()
if not row:
return None
else:
return row[0]
- def did_upload_file(self, filecap, path, version, mtime, ctime, size):
- #print "_did_upload_file(%r, %r, %r, %r, %r, %r)" % (filecap, path, version, mtime, ctime, size)
+ def did_upload_version(self, filecap, relpath_u, version, pathinfo):
+ #print "did_upload_version(%r, %r, %r, %r)" % (filecap, relpath_u, version, pathinfo)
now = time.time()
fileid = self.get_or_allocate_fileid_for_cap(filecap)
try:
(now, now, fileid))
try:
self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?,?)",
- (path, size, mtime, ctime, fileid, version))
+ (relpath_u, pathinfo.size, pathinfo.mtime, pathinfo.ctime, fileid, version))
except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
self.cursor.execute("UPDATE local_files"
" SET size=?, mtime=?, ctime=?, fileid=?, version=?"
" WHERE path=?",
- (size, mtime, ctime, fileid, version, path))
+ (pathinfo.size, pathinfo.mtime, pathinfo.ctime, fileid, version, relpath_u))
self.connection.commit()
- def is_new_file_time(self, path, relpath_u):
- """recent_file_time returns true if the file is recent...
- meaning its current statinfo (i.e. size, ctime, and mtime) matched the statinfo
- that was previously stored in the db.
+ def is_new_file(self, pathinfo, relpath_u):
"""
- #print "check_file_time %s %s" % (path, relpath_u)
- path = abspath_expanduser_unicode(path)
- s = os.stat(path)
- size = s[stat.ST_SIZE]
- ctime = s[stat.ST_CTIME]
- mtime = s[stat.ST_MTIME]
+ Returns true if the file's current pathinfo (size, mtime, and ctime) has
+ changed from the pathinfo previously stored in the db.
+ """
+ #print "is_new_file(%r, %r)" % (pathinfo, relpath_u)
c = self.cursor
- c.execute("SELECT size,mtime,ctime,fileid"
+ c.execute("SELECT size, mtime, ctime"
" FROM local_files"
" WHERE path=?",
(relpath_u,))
row = self.cursor.fetchone()
if not row:
return True
- (last_size,last_mtime,last_ctime,last_fileid) = row
- if (size, ctime, mtime) == (last_size, last_ctime, last_mtime):
- return False
- else:
- return True
+ return (pathinfo.size, pathinfo.mtime, pathinfo.ctime) != row
}
def __init__(self, basedir="."):
+ #print "Client.__init__(%r)" % (basedir,)
node.Node.__init__(self, basedir)
self.connected_enough_d = defer.Deferred()
self.started_timestamp = time.time()
from allmydata.frontends import magic_folder
- s = magic_folder.MagicFolder(self, upload_dircap, local_dir, dbfile)
+ s = magic_folder.MagicFolder(self, upload_dircap, collective_dircap, local_dir, dbfile)
s.setServiceParent(self)
s.startService()
-import sys
+import sys, os
+import os.path
+from collections import deque
+import time
-from twisted.internet import defer
-from twisted.python.filepath import FilePath
+from twisted.internet import defer, reactor, task
+from twisted.python.failure import Failure
+from twisted.python import runtime
from twisted.application import service
-from foolscap.api import eventually
+from allmydata.util import fileutil
from allmydata.interfaces import IDirectoryNode
+from allmydata.util import log
+from allmydata.util.fileutil import precondition_abspath, get_pathinfo
+from allmydata.util.assertutil import precondition
+from allmydata.util.deferredutil import HookMixin
+from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
+ extend_filepath, unicode_from_filepath, unicode_segments_from, \
+ quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
+from allmydata.immutable.upload import FileName, Data
+from allmydata import backupdb, magicpath
-from allmydata.util.fileutil import abspath_expanduser_unicode, precondition_abspath
-from allmydata.util.encodingutil import listdir_unicode, to_filepath, \
- unicode_from_filepath, quote_local_unicode_path, FilenameEncodingError
-from allmydata.immutable.upload import FileName
-from allmydata import backupdb
+IN_EXCL_UNLINK = 0x04000000L
+
+def get_inotify_module():
+ try:
+ if sys.platform == "win32":
+ from allmydata.windows import inotify
+ elif runtime.platform.supportsINotify():
+ from twisted.internet import inotify
+ else:
+ raise NotImplementedError("filesystem notification needed for drop-upload is not supported.\n"
+ "This currently requires Linux or Windows.")
+ return inotify
+ except (ImportError, AttributeError) as e:
+ log.msg(e)
+ if sys.platform == "win32":
+ raise NotImplementedError("filesystem notification needed for drop-upload is not supported.\n"
+ "Windows support requires at least Vista, and has only been tested on Windows 7.")
+ raise
class MagicFolder(service.MultiService):
name = 'magic-folder'
- def __init__(self, client, upload_dircap, local_dir, dbfile, inotify=None,
- pending_delay=1.0):
- precondition_abspath(local_dir)
+ def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
+ pending_delay=1.0, clock=reactor):
+ precondition_abspath(local_path_u)
service.MultiService.__init__(self)
- self._local_dir = abspath_expanduser_unicode(local_dir)
+
+ db = backupdb.get_backupdb(dbfile, create_version=(backupdb.SCHEMA_v3, 3))
+ if db is None:
+ return Failure(Exception('ERROR: Unable to load magic folder db.'))
+
+ # for tests
self._client = client
- self._stats_provider = client.stats_provider
- self._convergence = client.convergence
- self._local_path = to_filepath(self._local_dir)
- self._dbfile = dbfile
+ self._db = db
- self.is_upload_ready = False
+ self.is_ready = False
- if inotify is None:
- if sys.platform == "win32":
- from allmydata.windows import inotify
- else:
- from twisted.internet import inotify
- self._inotify = inotify
+ self.uploader = Uploader(client, local_path_u, db, upload_dircap, pending_delay, clock)
+ self.downloader = Downloader(client, local_path_u, db, collective_dircap, clock)
+
+ def startService(self):
+ # TODO: why is this being called more than once?
+ if self.running:
+ return defer.succeed(None)
+ print "%r.startService" % (self,)
+ service.MultiService.startService(self)
+ return self.uploader.start_monitoring()
+
+ def ready(self):
+ """ready is used to signal us to start
+ processing the upload and download items...
+ """
+ self.is_ready = True
+ d = self.uploader.start_scanning()
+ d2 = self.downloader.start_scanning()
+ d.addCallback(lambda ign: d2)
+ return d
- if not self._local_path.exists():
+ def finish(self):
+ print "finish"
+ d = self.uploader.stop()
+ d2 = self.downloader.stop()
+ d.addCallback(lambda ign: d2)
+ return d
+
+ def remove_service(self):
+ return service.MultiService.disownServiceParent(self)
+
+
+class QueueMixin(HookMixin):
+ def __init__(self, client, local_path_u, db, name, clock):
+ self._client = client
+ self._local_path_u = local_path_u
+ self._local_filepath = to_filepath(local_path_u)
+ self._db = db
+ self._name = name
+ self._clock = clock
+ self._hooks = {'processed': None, 'started': None}
+ self.started_d = self.set_hook('started')
+
+ if not self._local_filepath.exists():
raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
"but there is no directory at that location."
- % quote_local_unicode_path(local_dir))
- if not self._local_path.isdir():
+ % quote_local_unicode_path(self._local_path_u))
+ if not self._local_filepath.isdir():
raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
"but the thing at that location is not a directory."
- % quote_local_unicode_path(local_dir))
+ % quote_local_unicode_path(self._local_path_u))
+
+ self._deque = deque()
+ self._lazy_tail = defer.succeed(None)
+ self._pending = set()
+ self._stopped = False
+ self._turn_delay = 0
+
+ def _get_filepath(self, relpath_u):
+ return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
+
+ def _get_relpath(self, filepath):
+ print "_get_relpath(%r)" % (filepath,)
+ segments = unicode_segments_from(filepath, self._local_filepath)
+ print "segments = %r" % (segments,)
+ return u"/".join(segments)
+
+ def _count(self, counter_name, delta=1):
+ ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
+ print "%r += %r" % (ctr, delta)
+ self._client.stats_provider.count(ctr, delta)
+
+ def _log(self, msg):
+ s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
+ self._client.log(s)
+ print s
+ #open("events", "ab+").write(msg)
+
+ def _append_to_deque(self, relpath_u):
+ print "_append_to_deque(%r)" % (relpath_u,)
+ if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
+ return
+ self._deque.append(relpath_u)
+ self._pending.add(relpath_u)
+ self._count('objects_queued')
+ if self.is_ready:
+ self._clock.callLater(0, self._turn_deque)
+
+ def _turn_deque(self):
+ if self._stopped:
+ return
+ try:
+ item = self._deque.pop()
+ self._count('objects_queued', -1)
+ except IndexError:
+ self._log("deque is now empty")
+ self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
+ else:
+ self._lazy_tail.addCallback(lambda ign: self._process(item))
+ self._lazy_tail.addBoth(self._call_hook, 'processed')
+ self._lazy_tail.addErrback(log.err)
+ self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
+
+
+class Uploader(QueueMixin):
+ def __init__(self, client, local_path_u, db, upload_dircap, pending_delay, clock):
+ QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
+
+ self.is_ready = False
# TODO: allow a path rather than a cap URI.
- self._parent = self._client.create_node_from_uri(upload_dircap)
- if not IDirectoryNode.providedBy(self._parent):
+ self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
+ if not IDirectoryNode.providedBy(self._upload_dirnode):
raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
- if self._parent.is_unknown() or self._parent.is_readonly():
+ if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
- self._uploaded_callback = lambda ign: None
+ self._inotify = get_inotify_module()
+ self._notifier = self._inotify.INotify()
- self._notifier = inotify.INotify()
if hasattr(self._notifier, 'set_pending_delay'):
self._notifier.set_pending_delay(pending_delay)
# We don't watch for IN_CREATE, because that would cause us to read and upload a
# possibly-incomplete file before the application has closed it. There should always
# be an IN_CLOSE_WRITE after an IN_CREATE (I think).
- # TODO: what about IN_MOVE_SELF or IN_UNMOUNT?
- mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO | inotify.IN_ONLYDIR
- self._notifier.watch(self._local_path, mask=mask, callbacks=[self._notify])
-
- def _check_db_file(self, childpath):
- # returns True if the file must be uploaded.
- assert self._db != None
- r = self._db.check_file(childpath)
- filecap = r.was_uploaded()
- if filecap is False:
- return True
+ # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
+ #
+ self.mask = ( self._inotify.IN_CLOSE_WRITE
+ | self._inotify.IN_MOVED_TO
+ | self._inotify.IN_MOVED_FROM
+ | self._inotify.IN_DELETE
+ | self._inotify.IN_ONLYDIR
+ | IN_EXCL_UNLINK
+ )
+ self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
+ recursive=True)
- def startService(self):
- self._db = backupdb.get_backupdb(self._dbfile)
- if self._db is None:
- return Failure(Exception('ERROR: Unable to load magic folder db.'))
+ def start_monitoring(self):
+ self._log("start_monitoring")
+ d = defer.succeed(None)
+ d.addCallback(lambda ign: self._notifier.startReading())
+ d.addCallback(lambda ign: self._count('dirs_monitored'))
+ d.addBoth(self._call_hook, 'started')
+ return d
- service.MultiService.startService(self)
- d = self._notifier.startReading()
- self._stats_provider.count('drop_upload.dirs_monitored', 1)
+ def stop(self):
+ self._log("stop")
+ self._notifier.stopReading()
+ self._count('dirs_monitored', -1)
+ if hasattr(self._notifier, 'wait_until_stopped'):
+ d = self._notifier.wait_until_stopped()
+ else:
+ d = defer.succeed(None)
+ d.addCallback(lambda ign: self._lazy_tail)
return d
- def upload_ready(self):
- """upload_ready is used to signal us to start
- processing the upload items...
- """
- self.is_upload_ready = True
+ def start_scanning(self):
+ self._log("start_scanning")
+ self.is_ready = True
+ self._pending = self._db.get_all_relpaths()
+ print "all_files %r" % (self._pending)
+ d = self._scan(u"")
+ def _add_pending(ign):
+ # This adds all of the files that were in the db but not already processed
+ # (normally because they have been deleted on disk).
+ print "adding %r" % (self._pending)
+ self._deque.extend(self._pending)
+ d.addCallback(_add_pending)
+ d.addCallback(lambda ign: self._turn_deque())
+ return d
+
+ def _scan(self, reldir_u):
+ self._log("scan %r" % (reldir_u,))
+ fp = self._get_filepath(reldir_u)
+ try:
+ children = listdir_filepath(fp)
+ except EnvironmentError:
+ raise Exception("WARNING: magic folder: permission denied on directory %s"
+ % quote_filepath(fp))
+ except FilenameEncodingError:
+ raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
+ % quote_filepath(fp))
+
+ d = defer.succeed(None)
+ for child in children:
+ assert isinstance(child, unicode), child
+ d.addCallback(lambda ign, child=child:
+ ("%s/%s" % (reldir_u, child) if reldir_u else child))
+ def _add_pending(relpath_u):
+ if magicpath.should_ignore_file(relpath_u):
+ return None
+
+ self._pending.add(relpath_u)
+ return relpath_u
+ d.addCallback(_add_pending)
+ # This call to _process doesn't go through the deque, and probably should.
+ d.addCallback(self._process)
+ d.addBoth(self._call_hook, 'processed')
+ d.addErrback(log.err)
+
+ return d
def _notify(self, opaque, path, events_mask):
self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
+ relpath_u = self._get_relpath(path)
+ self._append_to_deque(relpath_u)
+
+ def _when_queue_is_empty(self):
+ return defer.succeed(None)
- self._stats_provider.count('drop_upload.files_queued', 1)
- eventually(self._process, opaque, path, events_mask)
+ def _process(self, relpath_u):
+ self._log("_process(%r)" % (relpath_u,))
+ if relpath_u is None:
+ return
+ precondition(isinstance(relpath_u, unicode), relpath_u)
- def _process(self, opaque, path, events_mask):
d = defer.succeed(None)
- # FIXME: if this already exists as a mutable file, we replace the directory entry,
- # but we should probably modify the file (as the SFTP frontend does).
- def _add_file(ign):
- name = path.basename()
- # on Windows the name is already Unicode
- if not isinstance(name, unicode):
- name = name.decode(get_filesystem_encoding())
-
- u = FileName(path.path, self._convergence)
- return self._parent.add_file(name, u)
- d.addCallback(_add_file)
-
- def _succeeded(ign):
- self._stats_provider.count('drop_upload.files_queued', -1)
- self._stats_provider.count('drop_upload.files_uploaded', 1)
- def _failed(f):
- self._stats_provider.count('drop_upload.files_queued', -1)
- if path.exists():
- self._log("drop-upload: %r failed to upload due to %r" % (path.path, f))
- self._stats_provider.count('drop_upload.files_failed', 1)
- return f
+ def _maybe_upload(val):
+ fp = self._get_filepath(relpath_u)
+ pathinfo = get_pathinfo(unicode_from_filepath(fp))
+
+ print "pending = %r, about to remove %r" % (self._pending, relpath_u)
+ self._pending.remove(relpath_u)
+ encoded_path_u = magicpath.path2magic(relpath_u)
+
+ if not pathinfo.exists:
+ self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
+ self._count('objects_disappeared')
+ d2 = defer.succeed(None)
+ if self._db.check_file_db_exists(relpath_u):
+ d2.addCallback(lambda ign: self._get_metadata(encoded_path_u))
+ current_version = self._db.get_local_file_version(relpath_u) + 1
+ def set_deleted(metadata):
+ metadata['version'] = current_version
+ metadata['deleted'] = True
+ empty_uploadable = Data("", self._client.convergence)
+ return self._upload_dirnode.add_file(encoded_path_u, empty_uploadable, overwrite=True, metadata=metadata)
+ d2.addCallback(set_deleted)
+ def add_db_entry(filenode):
+ filecap = filenode.get_uri()
+ self._db.did_upload_version(filecap, relpath_u, current_version, pathinfo)
+ self._count('files_uploaded')
+ # FIXME consider whether it's correct to retrieve the filenode again.
+ d2.addCallback(lambda x: self._get_filenode(encoded_path_u))
+ d2.addCallback(add_db_entry)
+
+ d2.addCallback(lambda x: Exception("file does not exist")) # FIXME wrong
+ return d2
+ elif pathinfo.islink:
+ self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
+ return None
+ elif pathinfo.isdir:
+ self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
+ uploadable = Data("", self._client.convergence)
+ encoded_path_u += magicpath.path2magic(u"/")
+ upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
+ def _succeeded(ign):
+ self._log("created subdirectory %r" % (relpath_u,))
+ self._count('directories_created')
+ def _failed(f):
+ self._log("failed to create subdirectory %r" % (relpath_u,))
+ return f
+ upload_d.addCallbacks(_succeeded, _failed)
+ upload_d.addCallback(lambda ign: self._scan(relpath_u))
+ return upload_d
+ elif pathinfo.isfile:
+ version = self._db.get_local_file_version(relpath_u)
+ if version is None:
+ version = 0
+ elif self._db.is_new_file(pathinfo, relpath_u):
+ version += 1
+ else:
+ return None
+
+ uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
+ d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":version}, overwrite=True)
+ def add_db_entry(filenode):
+ filecap = filenode.get_uri()
+ self._db.did_upload_version(filecap, relpath_u, version, pathinfo)
+ self._count('files_uploaded')
+ d2.addCallback(add_db_entry)
+ return d2
else:
- self._log("drop-upload: notified file %r disappeared "
- "(this is normal for temporary files): %r" % (path.path, f))
- self._stats_provider.count('drop_upload.files_disappeared', 1)
+ self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
return None
+
+ d.addCallback(_maybe_upload)
+
+ def _succeeded(res):
+ self._count('objects_succeeded')
+ return res
+ def _failed(f):
+ print f
+ self._count('objects_failed')
+ self._log("%r while processing %r" % (f, relpath_u))
+ return f
d.addCallbacks(_succeeded, _failed)
- d.addBoth(self._uploaded_callback)
return d
- def set_uploaded_callback(self, callback):
- """This sets a function that will be called after a file has been uploaded."""
- self._uploaded_callback = callback
+ def _get_metadata(self, encoded_path_u):
+ try:
+ d = self._upload_dirnode.get_metadata_for(encoded_path_u)
+ except KeyError:
+ return Failure()
+ return d
- def finish(self, for_tests=False):
- self._notifier.stopReading()
- self._stats_provider.count('drop_upload.dirs_monitored', -1)
- if for_tests and hasattr(self._notifier, 'wait_until_stopped'):
- return self._notifier.wait_until_stopped()
+ def _get_filenode(self, encoded_path_u):
+ try:
+ d = self._upload_dirnode.get(encoded_path_u)
+ except KeyError:
+ return Failure()
+ return d
+
+
+class Downloader(QueueMixin):
+ def __init__(self, client, local_path_u, db, collective_dircap, clock):
+ QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
+
+ # TODO: allow a path rather than a cap URI.
+ self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
+
+ if not IDirectoryNode.providedBy(self._collective_dirnode):
+ raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
+ if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
+ raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
+
+ self._turn_delay = 3 # delay between remote scans
+ self._download_scan_batch = {} # path -> [(filenode, metadata)]
+
+ def start_scanning(self):
+ self._log("\nstart_scanning")
+ files = self._db.get_all_relpaths()
+ self._log("all files %s" % files)
+
+ d = self._scan_remote_collective()
+ self._turn_deque()
+ return d
+
+ def stop(self):
+ self._stopped = True
+ d = defer.succeed(None)
+ d.addCallback(lambda ign: self._lazy_tail)
+ return d
+
+ def _should_download(self, relpath_u, remote_version):
+ """
+ _should_download returns a bool indicating whether or not a remote object should be downloaded.
+ We check the remote metadata version against our magic-folder db version number;
+ latest version wins.
+ """
+ if magicpath.should_ignore_file(relpath_u):
+ return False
+ v = self._db.get_local_file_version(relpath_u)
+ return (v is None or v < remote_version)
+
+ def _get_local_latest(self, relpath_u):
+ """
+ _get_local_latest takes a unicode path string checks to see if this file object
+ exists in our magic-folder db; if not then return None
+ else check for an entry in our magic-folder db and return the version number.
+ """
+ if not self._get_filepath(relpath_u).exists():
+ return None
+ return self._db.get_local_file_version(relpath_u)
+
+ def _get_collective_latest_file(self, filename):
+ """
+ _get_collective_latest_file takes a file path pointing to a file managed by
+ magic-folder and returns a deferred that fires with the two tuple containing a
+ file node and metadata for the latest version of the file located in the
+ magic-folder collective directory.
+ """
+ collective_dirmap_d = self._collective_dirnode.list()
+ def scan_collective(result):
+ list_of_deferreds = []
+ for dir_name in result.keys():
+ # XXX make sure it's a directory
+ d = defer.succeed(None)
+ d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
+ list_of_deferreds.append(d)
+ deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
+ return deferList
+ collective_dirmap_d.addCallback(scan_collective)
+ def highest_version(deferredList):
+ max_version = 0
+ metadata = None
+ node = None
+ for success, result in deferredList:
+ if success:
+ if result[1]['version'] > max_version:
+ node, metadata = result
+ max_version = result[1]['version']
+ return node, metadata
+ collective_dirmap_d.addCallback(highest_version)
+ return collective_dirmap_d
+
+ def _append_to_batch(self, name, file_node, metadata):
+ if self._download_scan_batch.has_key(name):
+ self._download_scan_batch[name] += [(file_node, metadata)]
else:
- return defer.succeed(None)
+ self._download_scan_batch[name] = [(file_node, metadata)]
- def _log(self, msg):
- self._client.log(msg)
- #open("events", "ab+").write(msg)
+ def _scan_remote(self, nickname, dirnode):
+ self._log("_scan_remote nickname %r" % (nickname,))
+ d = dirnode.list()
+ def scan_listing(listing_map):
+ for name in listing_map.keys():
+ file_node, metadata = listing_map[name]
+ local_version = self._get_local_latest(name)
+ remote_version = metadata.get('version', None)
+ self._log("%r has local version %r, remote version %r" % (name, local_version, remote_version))
+ if local_version is None or remote_version is None or local_version < remote_version:
+ self._log("added to download queue\n")
+ self._append_to_batch(name, file_node, metadata)
+ d.addCallback(scan_listing)
+ return d
+
+ def _scan_remote_collective(self):
+ self._log("_scan_remote_collective")
+ self._download_scan_batch = {} # XXX
+
+ if self._collective_dirnode is None:
+ return
+ collective_dirmap_d = self._collective_dirnode.list()
+ def do_list(result):
+ others = [x for x in result.keys()]
+ return result, others
+ collective_dirmap_d.addCallback(do_list)
+ def scan_collective(result):
+ d = defer.succeed(None)
+ collective_dirmap, others_list = result
+ for dir_name in others_list:
+ d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
+ # XXX todo add errback
+ return d
+ collective_dirmap_d.addCallback(scan_collective)
+ collective_dirmap_d.addCallback(self._filter_scan_batch)
+ collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
+ return collective_dirmap_d
+
+ def _add_batch_to_download_queue(self, result):
+ print "result = %r" % (result,)
+ print "deque = %r" % (self._deque,)
+ self._deque.extend(result)
+ print "deque after = %r" % (self._deque,)
+ self._count('objects_queued', len(result))
+ print "pending = %r" % (self._pending,)
+ self._pending.update(map(lambda x: x[0], result))
+ print "pending after = %r" % (self._pending,)
+
+ def _filter_scan_batch(self, result):
+ extension = [] # consider whether this should be a dict
+ for relpath_u in self._download_scan_batch.keys():
+ if relpath_u in self._pending:
+ continue
+ file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
+ if self._should_download(relpath_u, metadata['version']):
+ extension += [(relpath_u, file_node, metadata)]
+ return extension
+
+ def _when_queue_is_empty(self):
+ d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
+ d.addCallback(lambda ign: self._turn_deque())
+ return d
+
+ def _process(self, item):
+ (relpath_u, file_node, metadata) = item
+ d = file_node.download_best_version()
+ def succeeded(res):
+ fp = self._get_filepath(relpath_u)
+ abspath_u = unicode_from_filepath(fp)
+ d2 = defer.succeed(res)
+ d2.addCallback(lambda result: self._write_downloaded_file(abspath_u, result, is_conflict=False))
+ def do_update_db(written_abspath_u):
+ filecap = file_node.get_uri()
+ written_pathinfo = get_pathinfo(written_abspath_u)
+ if not written_pathinfo.exists:
+ raise Exception("downloaded file %s disappeared" % quote_local_unicode_path(written_abspath_u))
+
+ self._db.did_upload_version(filecap, relpath_u, metadata['version'], written_pathinfo)
+ d2.addCallback(do_update_db)
+ # XXX handle failure here with addErrback...
+ self._count('objects_downloaded')
+ return d2
+ def failed(f):
+ self._log("download failed: %s" % (str(f),))
+ self._count('objects_download_failed')
+ return f
+ d.addCallbacks(succeeded, failed)
+ def remove_from_pending(res):
+ self._pending.remove(relpath_u)
+ return res
+ d.addBoth(remove_from_pending)
+ return d
+
+ FUDGE_SECONDS = 10.0
+
+ @classmethod
+ def _write_downloaded_file(cls, abspath_u, file_contents, is_conflict=False, now=None):
+ # 1. Write a temporary file, say .foo.tmp.
+ # 2. is_conflict determines whether this is an overwrite or a conflict.
+ # 3. Set the mtime of the replacement file to be T seconds before the
+ # current local time.
+ # 4. Perform a file replacement with backup filename foo.backup,
+ # replaced file foo, and replacement file .foo.tmp. If any step of
+ # this operation fails, reclassify as a conflict and stop.
+ #
+ # Returns the path of the destination file.
+
+ precondition_abspath(abspath_u)
+ replacement_path_u = abspath_u + u".tmp" # FIXME more unique
+ backup_path_u = abspath_u + u".backup"
+ if now is None:
+ now = time.time()
+
+ fileutil.write(replacement_path_u, file_contents)
+ os.utime(replacement_path_u, (now, now - cls.FUDGE_SECONDS))
+ if is_conflict:
+ return cls._rename_conflicted_file(abspath_u, replacement_path_u)
+ else:
+ try:
+ fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
+ return abspath_u
+ except fileutil.ConflictError:
+ return cls._rename_conflicted_file(abspath_u, replacement_path_u)
+
+ @classmethod
+ def _rename_conflicted_file(self, abspath_u, replacement_path_u):
+ conflict_path_u = abspath_u + u".conflict"
+ fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
+ return conflict_path_u
--- /dev/null
+
+import re
+import os.path
+
+from allmydata.util.assertutil import precondition
+
+def path2magic(path):
+ return re.sub(ur'[/@]', lambda m: {u'/': u'@_', u'@': u'@@'}[m.group(0)], path)
+
+def magic2path(path):
+ return re.sub(ur'@[_@]', lambda m: {u'@_': u'/', u'@@': u'@'}[m.group(0)], path)
+
+
+IGNORE_SUFFIXES = [u'.backup', u'.tmp', u'.conflicted']
+IGNORE_PREFIXES = [u'.']
+
+def should_ignore_file(path_u):
+ precondition(isinstance(path_u, unicode), path_u=path_u)
+
+ for suffix in IGNORE_SUFFIXES:
+ if path_u.endswith(suffix):
+ return True
+ while path_u != u"":
+ path_u, tail_u = os.path.split(path_u)
+ if tail_u.startswith(u"."):
+ return True
+ return False
--- /dev/null
+
+import os
+from cStringIO import StringIO
+from twisted.python import usage
+
+from .common import BaseOptions, BasedirOptions, get_aliases
+from .cli import MakeDirectoryOptions, LnOptions, CreateAliasOptions
+import tahoe_mv
+from allmydata.util import fileutil
+from allmydata import uri
+
+INVITE_SEPARATOR = "+"
+
+class CreateOptions(BasedirOptions):
+ nickname = None
+ localdir = None
+ synopsis = "MAGIC_ALIAS: [NICKNAME LOCALDIR]"
+ def parseArgs(self, alias, nickname=None, localdir=None):
+ BasedirOptions.parseArgs(self)
+ if not alias.endswith(':'):
+ raise usage.UsageError("An alias must end with a ':' character.")
+ self.alias = alias[:-1]
+ self.nickname = nickname
+ self.localdir = localdir
+ if self.nickname and not self.localdir:
+ raise usage.UsageError("If NICKNAME is specified then LOCALDIR must also be specified.")
+ node_url_file = os.path.join(self['node-directory'], "node.url")
+ self['node-url'] = fileutil.read(node_url_file).strip()
+
+def _delegate_options(source_options, target_options):
+ target_options.aliases = get_aliases(source_options['node-directory'])
+ target_options["node-url"] = source_options["node-url"]
+ target_options["node-directory"] = source_options["node-directory"]
+ target_options.stdin = StringIO("")
+ target_options.stdout = StringIO()
+ target_options.stderr = StringIO()
+ return target_options
+
+def create(options):
+ from allmydata.scripts import tahoe_add_alias
+ create_alias_options = _delegate_options(options, CreateAliasOptions())
+ create_alias_options.alias = options.alias
+
+ rc = tahoe_add_alias.create_alias(create_alias_options)
+ if rc != 0:
+ print >>options.stderr, create_alias_options.stderr.getvalue()
+ return rc
+ print >>options.stdout, create_alias_options.stdout.getvalue()
+
+ if options.nickname is not None:
+ invite_options = _delegate_options(options, InviteOptions())
+ invite_options.alias = options.alias
+ invite_options.nickname = options.nickname
+ rc = invite(invite_options)
+ if rc != 0:
+ print >>options.stderr, "magic-folder: failed to invite after create\n"
+ print >>options.stderr, invite_options.stderr.getvalue()
+ return rc
+ invite_code = invite_options.stdout.getvalue().strip()
+
+ join_options = _delegate_options(options, JoinOptions())
+ join_options.invite_code = invite_code
+ fields = invite_code.split(INVITE_SEPARATOR)
+ if len(fields) != 2:
+ raise usage.UsageError("Invalid invite code.")
+ join_options.magic_readonly_cap, join_options.dmd_write_cap = fields
+ join_options.local_dir = options.localdir
+ rc = join(join_options)
+ if rc != 0:
+ print >>options.stderr, "magic-folder: failed to join after create\n"
+ print >>options.stderr, join_options.stderr.getvalue()
+ return rc
+ return 0
+
+class InviteOptions(BasedirOptions):
+ nickname = None
+ synopsis = "MAGIC_ALIAS: NICKNAME"
+ stdin = StringIO("")
+ def parseArgs(self, alias, nickname=None):
+ BasedirOptions.parseArgs(self)
+ if not alias.endswith(':'):
+ raise usage.UsageError("An alias must end with a ':' character.")
+ self.alias = alias[:-1]
+ self.nickname = nickname
+ node_url_file = os.path.join(self['node-directory'], "node.url")
+ self['node-url'] = open(node_url_file, "r").read().strip()
+ aliases = get_aliases(self['node-directory'])
+ self.aliases = aliases
+
+def invite(options):
+ from allmydata.scripts import tahoe_mkdir
+ mkdir_options = _delegate_options(options, MakeDirectoryOptions())
+ mkdir_options.where = None
+
+ rc = tahoe_mkdir.mkdir(mkdir_options)
+ if rc != 0:
+ print >>options.stderr, "magic-folder: failed to mkdir\n"
+ return rc
+ dmd_write_cap = mkdir_options.stdout.getvalue().strip()
+ dmd_readonly_cap = unicode(uri.from_string(dmd_write_cap).get_readonly().to_string(), 'utf-8')
+ if dmd_readonly_cap is None:
+ print >>options.stderr, "magic-folder: failed to diminish dmd write cap\n"
+ return 1
+
+ magic_write_cap = get_aliases(options["node-directory"])[options.alias]
+ magic_readonly_cap = unicode(uri.from_string(magic_write_cap).get_readonly().to_string(), 'utf-8')
+ # tahoe ln CLIENT_READCAP COLLECTIVE_WRITECAP/NICKNAME
+ ln_options = _delegate_options(options, LnOptions())
+ ln_options.from_file = dmd_readonly_cap
+ ln_options.to_file = "%s/%s" % (magic_write_cap, options.nickname)
+ rc = tahoe_mv.mv(ln_options, mode="link")
+ if rc != 0:
+ print >>options.stderr, "magic-folder: failed to create link\n"
+ print >>options.stderr, ln_options.stderr.getvalue()
+ return rc
+
+ print >>options.stdout, "%s%s%s" % (magic_readonly_cap, INVITE_SEPARATOR, dmd_write_cap)
+ return 0
+
+class JoinOptions(BasedirOptions):
+ synopsis = "INVITE_CODE LOCAL_DIR"
+ dmd_write_cap = ""
+ magic_readonly_cap = ""
+ def parseArgs(self, invite_code, local_dir):
+ BasedirOptions.parseArgs(self)
+ self.local_dir = local_dir
+ fields = invite_code.split(INVITE_SEPARATOR)
+ if len(fields) != 2:
+ raise usage.UsageError("Invalid invite code.")
+ self.magic_readonly_cap, self.dmd_write_cap = fields
+
+def join(options):
+ dmd_cap_file = os.path.join(options["node-directory"], "private/magic_folder_dircap")
+ collective_readcap_file = os.path.join(options["node-directory"], "private/collective_dircap")
+
+ fileutil.write(dmd_cap_file, options.dmd_write_cap)
+ fileutil.write(collective_readcap_file, options.magic_readonly_cap)
+ fileutil.write(os.path.join(options["node-directory"], "tahoe.cfg"),
+ "[magic_folder]\nenabled = True\nlocal.directory = %s\n"
+ % (options.local_dir.encode('utf-8'),), mode="ab")
+ return 0
+
+class MagicFolderCommand(BaseOptions):
+ subCommands = [
+ ["create", None, CreateOptions, "Create a Magic Folder."],
+ ["invite", None, InviteOptions, "Invite someone to a Magic Folder."],
+ ["join", None, JoinOptions, "Join a Magic Folder."],
+ ]
+ def postOptions(self):
+ if not hasattr(self, 'subOptions'):
+ raise usage.UsageError("must specify a subcommand")
+ def getSynopsis(self):
+ return "Usage: tahoe [global-options] magic SUBCOMMAND"
+ def getUsage(self, width=None):
+ t = BaseOptions.getUsage(self, width)
+ t += """\
+Please run e.g. 'tahoe magic-folder create --help' for more details on each
+subcommand.
+"""
+ return t
+
+subDispatch = {
+ "create": create,
+ "invite": invite,
+ "join": join,
+}
+
+def do_magic_folder(options):
+ so = options.subOptions
+ so.stdout = options.stdout
+ so.stderr = options.stderr
+ f = subDispatch[options.subCommand]
+ return f(so)
+
+subCommands = [
+ ["magic-folder", None, MagicFolderCommand,
+ "Magic Folder subcommands: use 'tahoe magic-folder' for a list."],
+]
+
+dispatch = {
+ "magic-folder": do_magic_folder,
+}
from twisted.python import usage
from allmydata.scripts.common import get_default_nodedir
-from allmydata.scripts import debug, create_node, startstop_node, cli, keygen, stats_gatherer, admin
+from allmydata.scripts import debug, create_node, startstop_node, cli, keygen, stats_gatherer, admin, \
+magic_folder_cli
from allmydata.util.encodingutil import quote_output, quote_local_unicode_path, get_io_encoding
def GROUP(s):
+ debug.subCommands
+ GROUP("Using the filesystem")
+ cli.subCommands
+ + magic_folder_cli.subCommands
)
optFlags = [
rc = admin.dispatch[command](so)
elif command in cli.dispatch:
rc = cli.dispatch[command](so)
+ elif command in magic_folder_cli.dispatch:
+ rc = magic_folder_cli.dispatch[command](so)
elif command in ac_dispatch:
rc = ac_dispatch[command](so, stdout, stderr)
else:
line.append(uri)
if options["readonly-uri"]:
line.append(quote_output(ro_uri or "-", quotemarks=False))
-
rows.append((encoding_error, line))
-
max_widths = []
left_justifys = []
for (encoding_error, row) in rows:
from twisted.python.failure import Failure
from foolscap.api import Referenceable, fireEventually, RemoteException
from base64 import b32encode
+
+from allmydata.util.assertutil import _assert
+
from allmydata import uri as tahoe_uri
from allmydata.client import Client
from allmydata.storage.server import StorageServer, storage_index_to_dir
return None
class NoNetworkClient(Client):
+
+ def disownServiceParent(self):
+ self.disownServiceParent()
def create_tub(self):
pass
def init_introducer_client(self):
self.proxies_by_id = {} # maps to IServer on which .rref is a wrapped
# StorageServer
self.clients = []
+ self.client_config_hooks = client_config_hooks
for i in range(num_servers):
ss = self.make_server(i)
self.rebuild_serverlist()
for i in range(num_clients):
- clientid = hashutil.tagged_hash("clientid", str(i))[:20]
- clientdir = os.path.join(basedir, "clients",
- idlib.shortnodeid_b2a(clientid))
- fileutil.make_dirs(clientdir)
- f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
+ c = self.make_client(i)
+ self.clients.append(c)
+
+ def make_client(self, i, write_config=True):
+ clientid = hashutil.tagged_hash("clientid", str(i))[:20]
+ clientdir = os.path.join(self.basedir, "clients",
+ idlib.shortnodeid_b2a(clientid))
+ fileutil.make_dirs(clientdir)
+
+ tahoe_cfg_path = os.path.join(clientdir, "tahoe.cfg")
+ if write_config:
+ f = open(tahoe_cfg_path, "w")
f.write("[node]\n")
f.write("nickname = client-%d\n" % i)
f.write("web.port = tcp:0:interface=127.0.0.1\n")
f.write("[storage]\n")
f.write("enabled = false\n")
f.close()
- c = None
- if i in client_config_hooks:
- # this hook can either modify tahoe.cfg, or return an
- # entirely new Client instance
- c = client_config_hooks[i](clientdir)
- if not c:
- c = NoNetworkClient(clientdir)
- c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
- c.nodeid = clientid
- c.short_nodeid = b32encode(clientid).lower()[:8]
- c._servers = self.all_servers # can be updated later
- c.setServiceParent(self)
- self.clients.append(c)
+ else:
+ _assert(os.path.exists(tahoe_cfg_path), tahoe_cfg_path=tahoe_cfg_path)
+
+ c = None
+ if i in self.client_config_hooks:
+ # this hook can either modify tahoe.cfg, or return an
+ # entirely new Client instance
+ c = self.client_config_hooks[i](clientdir)
+
+ if not c:
+ c = NoNetworkClient(clientdir)
+ c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
+
+ c.nodeid = clientid
+ c.short_nodeid = b32encode(clientid).lower()[:8]
+ c._servers = self.all_servers # can be updated later
+ c.setServiceParent(self)
+ return c
def make_server(self, i, readonly=False):
serverid = hashutil.tagged_hash("serverid", str(i))[:20]
num_servers=num_servers,
client_config_hooks=client_config_hooks)
self.g.setServiceParent(self.s)
+ self._record_webports_and_baseurls()
+
+ def _record_webports_and_baseurls(self):
self.client_webports = [c.getServiceNamed("webish").getPortnum()
for c in self.g.clients]
self.client_baseurls = [c.getServiceNamed("webish").getURL()
def get_clientdir(self, i=0):
return self.g.clients[i].basedir
+ def set_clientdir(self, basedir, i=0):
+ self.g.clients[i].basedir = basedir
+
+ def get_client(self, i=0):
+ return self.g.clients[i]
+
+ def restart_client(self, i=0):
+ client = self.g.clients[i]
+ d = defer.succeed(None)
+ d.addCallback(lambda ign: self.g.removeService(client))
+ def _make_client(ign):
+ c = self.g.make_client(i, write_config=False)
+ self.g.clients[i] = c
+ self._record_webports_and_baseurls()
+ d.addCallback(_make_client)
+ return d
+
def get_serverdir(self, i):
return self.g.servers_by_number[i].storedir
from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.util.encodingutil import get_io_encoding, unicode_to_argv
from allmydata.util.namespace import Namespace
-from allmydata.scripts import cli, backupdb
+from allmydata.scripts import cli
+from allmydata import backupdb
from .common_util import StallMixin
from .no_network import GridTestMixin
from .test_cli import CLITestMixin, parse_options
--- /dev/null
+import os.path
+import re
+
+from twisted.trial import unittest
+from twisted.internet import defer
+from twisted.internet import reactor
+
+from allmydata.util import fileutil
+from allmydata.scripts.common import get_aliases
+from allmydata.test.no_network import GridTestMixin
+from .test_cli import CLITestMixin
+from allmydata.scripts import magic_folder_cli
+from allmydata.util.fileutil import abspath_expanduser_unicode
+from allmydata.frontends.magic_folder import MagicFolder
+from allmydata import uri
+
+
+class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin):
+
+ def do_create_magic_folder(self, client_num):
+ d = self.do_cli("magic-folder", "create", "magic:", client_num=client_num)
+ def _done((rc,stdout,stderr)):
+ self.failUnlessEqual(rc, 0)
+ self.failUnlessIn("Alias 'magic' created", stdout)
+ self.failUnlessEqual(stderr, "")
+ aliases = get_aliases(self.get_clientdir(i=client_num))
+ self.failUnlessIn("magic", aliases)
+ self.failUnless(aliases["magic"].startswith("URI:DIR2:"))
+ d.addCallback(_done)
+ return d
+
+ def do_invite(self, client_num, nickname):
+ d = self.do_cli("magic-folder", "invite", u"magic:", nickname, client_num=client_num)
+ def _done((rc,stdout,stderr)):
+ self.failUnless(rc == 0)
+ return (rc,stdout,stderr)
+ d.addCallback(_done)
+ return d
+
+ def do_join(self, client_num, local_dir, invite_code):
+ magic_readonly_cap, dmd_write_cap = invite_code.split(magic_folder_cli.INVITE_SEPARATOR)
+ d = self.do_cli("magic-folder", "join", invite_code, local_dir, client_num=client_num)
+ def _done((rc,stdout,stderr)):
+ self.failUnless(rc == 0)
+ return (rc,stdout,stderr)
+ d.addCallback(_done)
+ return d
+
+ def check_joined_config(self, client_num, upload_dircap):
+ """Tests that our collective directory has the readonly cap of
+ our upload directory.
+ """
+ collective_readonly_cap = fileutil.read(os.path.join(self.get_clientdir(i=client_num), "private/collective_dircap"))
+ d = self.do_cli("ls", "--json", collective_readonly_cap, client_num=client_num)
+ def _done((rc,stdout,stderr)):
+ self.failUnless(rc == 0)
+ return (rc,stdout,stderr)
+ d.addCallback(_done)
+ def test_joined_magic_folder((rc,stdout,stderr)):
+ readonly_cap = unicode(uri.from_string(upload_dircap).get_readonly().to_string(), 'utf-8')
+ s = re.search(readonly_cap, stdout)
+ self.failUnless(s is not None)
+ return None
+ d.addCallback(test_joined_magic_folder)
+ return d
+
+ def get_caps_from_files(self, client_num):
+ collective_dircap = fileutil.read(os.path.join(self.get_clientdir(i=client_num), "private/collective_dircap"))
+ upload_dircap = fileutil.read(os.path.join(self.get_clientdir(i=client_num), "private/magic_folder_dircap"))
+ self.failIf(collective_dircap is None or upload_dircap is None)
+ return collective_dircap, upload_dircap
+
+ def check_config(self, client_num, local_dir):
+ client_config = fileutil.read(os.path.join(self.get_clientdir(i=client_num), "tahoe.cfg"))
+ # XXX utf-8?
+ local_dir = local_dir.encode('utf-8')
+ ret = re.search("\[magic_folder\]\nenabled = True\nlocal.directory = %s" % (local_dir,), client_config)
+ self.failIf(ret is None)
+
+ def create_invite_join_magic_folder(self, nickname, local_dir):
+ d = self.do_cli("magic-folder", "create", u"magic:", nickname, local_dir)
+ def _done((rc,stdout,stderr)):
+ self.failUnless(rc == 0)
+ return (rc,stdout,stderr)
+ d.addCallback(_done)
+ def get_alice_caps(x):
+ client = self.get_client()
+ self.collective_dircap, self.upload_dircap = self.get_caps_from_files(0)
+ self.collective_dirnode = client.create_node_from_uri(self.collective_dircap)
+ self.upload_dirnode = client.create_node_from_uri(self.upload_dircap)
+ d.addCallback(get_alice_caps)
+ d.addCallback(lambda x: self.check_joined_config(0, self.upload_dircap))
+ d.addCallback(lambda x: self.check_config(0, local_dir))
+ return d
+
+ def cleanup(self, res):
+ #print "cleanup", res
+ d = defer.succeed(None)
+ if self.magicfolder is not None:
+ d.addCallback(lambda ign: self.magicfolder.finish())
+ d.addCallback(lambda ign: res)
+ return d
+
+ def init_magicfolder(self, client_num, upload_dircap, collective_dircap, local_magic_dir, clock):
+ dbfile = abspath_expanduser_unicode(u"magicfolderdb.sqlite", base=self.get_clientdir(i=client_num))
+ magicfolder = MagicFolder(self.get_client(client_num), upload_dircap, collective_dircap, local_magic_dir,
+ dbfile, pending_delay=0.2, clock=clock)
+ magicfolder.setServiceParent(self.get_client(client_num))
+ magicfolder.ready()
+ return magicfolder
+
+ def setup_alice_and_bob(self, clock=reactor):
+ self.set_up_grid(num_clients=2)
+
+ alice_magic_dir = abspath_expanduser_unicode(u"Alice-magic", base=self.basedir)
+ self.mkdir_nonascii(alice_magic_dir)
+ bob_magic_dir = abspath_expanduser_unicode(u"Bob-magic", base=self.basedir)
+ self.mkdir_nonascii(bob_magic_dir)
+
+ # Alice creates a Magic Folder,
+ # invites herself then and joins.
+ d = self.do_create_magic_folder(0)
+ d.addCallback(lambda x: self.do_invite(0, u"Alice\u00F8"))
+ def get_invitecode(result):
+ self.invitecode = result[1].strip()
+ d.addCallback(get_invitecode)
+ d.addCallback(lambda x: self.do_join(0, alice_magic_dir, self.invitecode))
+ def get_alice_caps(x):
+ self.alice_collective_dircap, self.alice_upload_dircap = self.get_caps_from_files(0)
+ d.addCallback(get_alice_caps)
+ d.addCallback(lambda x: self.check_joined_config(0, self.alice_upload_dircap))
+ d.addCallback(lambda x: self.check_config(0, alice_magic_dir))
+ def get_Alice_magicfolder(result):
+ self.alice_magicfolder = self.init_magicfolder(0, self.alice_upload_dircap, self.alice_collective_dircap, alice_magic_dir, clock)
+ return result
+ d.addCallback(get_Alice_magicfolder)
+
+ # Alice invites Bob. Bob joins.
+ d.addCallback(lambda x: self.do_invite(0, u"Bob\u00F8"))
+ def get_invitecode(result):
+ self.invitecode = result[1].strip()
+ d.addCallback(get_invitecode)
+ d.addCallback(lambda x: self.do_join(1, bob_magic_dir, self.invitecode))
+ def get_bob_caps(x):
+ self.bob_collective_dircap, self.bob_upload_dircap = self.get_caps_from_files(1)
+ d.addCallback(get_bob_caps)
+ d.addCallback(lambda x: self.check_joined_config(1, self.bob_upload_dircap))
+ d.addCallback(lambda x: self.check_config(1, bob_magic_dir))
+ def get_Bob_magicfolder(result):
+ self.bob_magicfolder = self.init_magicfolder(1, self.bob_upload_dircap, self.bob_collective_dircap, bob_magic_dir, clock)
+ return result
+ d.addCallback(get_Bob_magicfolder)
+
+ def prepare_result(result):
+ # XXX improve this
+ return (self.alice_collective_dircap, self.alice_upload_dircap, self.alice_magicfolder,
+ self.bob_collective_dircap, self.bob_upload_dircap, self.bob_magicfolder)
+ d.addCallback(prepare_result)
+ return d
+
+
+class CreateMagicFolder(MagicFolderCLITestMixin, unittest.TestCase):
+
+ def test_create_and_then_invite_join(self):
+ self.basedir = "cli/MagicFolder/create-and-then-invite-join"
+ self.set_up_grid()
+ self.local_dir = os.path.join(self.basedir, "magic")
+ d = self.do_create_magic_folder(0)
+ d.addCallback(lambda x: self.do_invite(0, u"Alice"))
+ def get_invite((rc,stdout,stderr)):
+ self.invite_code = stdout.strip()
+ d.addCallback(get_invite)
+ d.addCallback(lambda x: self.do_join(0, self.local_dir, self.invite_code))
+ def get_caps(x):
+ self.collective_dircap, self.upload_dircap = self.get_caps_from_files(0)
+ d.addCallback(get_caps)
+ d.addCallback(lambda x: self.check_joined_config(0, self.upload_dircap))
+ d.addCallback(lambda x: self.check_config(0, self.local_dir))
+ return d
+
+ def test_create_error(self):
+ self.basedir = "cli/MagicFolder/create-error"
+ self.set_up_grid()
+ self.local_dir = os.path.join(self.basedir, "magic")
+ d = self.do_cli("magic-folder", "create", "m a g i c:", client_num=0)
+ def _done((rc,stdout,stderr)):
+ self.failIfEqual(rc, 0)
+ self.failUnlessIn("Alias names cannot contain spaces.", stderr)
+ d.addCallback(_done)
+ return d
+
+ def test_create_invite_join(self):
+ self.basedir = "cli/MagicFolder/create-invite-join"
+ self.set_up_grid()
+ self.local_dir = os.path.join(self.basedir, "magic")
+ d = self.do_cli("magic-folder", "create", u"magic:", u"Alice", self.local_dir)
+ def _done((rc,stdout,stderr)):
+ self.failUnless(rc == 0)
+ return (rc,stdout,stderr)
+ d.addCallback(_done)
+ def get_caps(x):
+ self.collective_dircap, self.upload_dircap = self.get_caps_from_files(0)
+ d.addCallback(get_caps)
+ d.addCallback(lambda x: self.check_joined_config(0, self.upload_dircap))
+ d.addCallback(lambda x: self.check_config(0, self.local_dir))
+ return d
class MockMagicFolder(service.MultiService):
name = 'magic-folder'
- def __init__(self, client, upload_dircap, local_dir, dbfile, inotify=None,
+ def __init__(self, client, upload_dircap, collective_dircap, local_dir, dbfile, inotify=None,
pending_delay=1.0):
service.MultiService.__init__(self)
self.client = client
self.upload_dircap = upload_dircap
+ self.collective_dircap = collective_dircap
self.local_dir = local_dir
self.dbfile = dbfile
self.inotify = inotify
+ def ready(self):
+ pass
+
self.patch(allmydata.frontends.magic_folder, 'MagicFolder', MockMagicFolder)
upload_dircap = "URI:DIR2:blah"
basedir1 = "test_client.Basic.test_create_magic_folder_service1"
os.mkdir(basedir1)
+
fileutil.write(os.path.join(basedir1, "tahoe.cfg"),
config + "local.directory = " + local_dir_utf8 + "\n")
self.failUnlessRaises(MissingConfigEntry, client.Client, basedir1)
fileutil.write(os.path.join(basedir1, "tahoe.cfg"), config)
fileutil.write(os.path.join(basedir1, "private", "magic_folder_dircap"), "URI:DIR2:blah")
+ fileutil.write(os.path.join(basedir1, "private", "collective_dircap"), "URI:DIR2:meow")
self.failUnlessRaises(MissingConfigEntry, client.Client, basedir1)
fileutil.write(os.path.join(basedir1, "tahoe.cfg"),
class Boom(Exception):
pass
- def BoomMagicFolder(client, upload_dircap, local_dir_utf8, inotify=None):
+ def BoomMagicFolder(client, upload_dircap, collective_dircap, local_dir, dbfile,
+ inotify=None, pending_delay=1.0):
raise Boom()
self.patch(allmydata.frontends.magic_folder, 'MagicFolder', BoomMagicFolder)
- logged_messages = []
- def mock_log(*args, **kwargs):
- logged_messages.append("%r %r" % (args, kwargs))
- self.patch(allmydata.util.log, 'msg', mock_log)
-
basedir2 = "test_client.Basic.test_create_magic_folder_service2"
os.mkdir(basedir2)
os.mkdir(os.path.join(basedir2, "private"))
"enabled = true\n" +
"local.directory = " + local_dir_utf8 + "\n")
fileutil.write(os.path.join(basedir2, "private", "magic_folder_dircap"), "URI:DIR2:blah")
- c2 = client.Client(basedir2)
- self.failUnlessRaises(KeyError, c2.getServiceNamed, 'magic-folder')
- self.failUnless([True for arg in logged_messages if "Boom" in arg],
- logged_messages)
+ fileutil.write(os.path.join(basedir2, "private", "collective_dircap"), "URI:DIR2:meow")
+ self.failUnlessRaises(Boom, client.Client, basedir2)
def flush_but_dont_ignore(res):
import os, sys
from twisted.trial import unittest
-from twisted.python import runtime
from twisted.internet import defer
-from allmydata.interfaces import IDirectoryNode, NoSuchChildError
+from allmydata.interfaces import IDirectoryNode
from allmydata.util import fake_inotify, fileutil
from allmydata.util.encodingutil import get_filesystem_encoding, to_filepath
from allmydata.test.no_network import GridTestMixin
from allmydata.test.common_util import ReallyEqualMixin, NonASCIIPathMixin
from allmydata.test.common import ShouldFailMixin
+from .test_cli_magic_folder import MagicFolderCLITestMixin
-from allmydata.frontends.magic_folder import MagicFolder
+from allmydata.frontends import magic_folder
+from allmydata.frontends.magic_folder import MagicFolder, Downloader
+from allmydata import backupdb, magicpath
from allmydata.util.fileutil import abspath_expanduser_unicode
-class MagicFolderTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, NonASCIIPathMixin):
+class MagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, ReallyEqualMixin, NonASCIIPathMixin):
"""
These tests will be run both with a mock notifier, and (on platforms that support it)
with the real INotify.
GridTestMixin.setUp(self)
temp = self.mktemp()
self.basedir = abspath_expanduser_unicode(temp.decode(get_filesystem_encoding()))
- def _get_count(self, name):
- return self.stats_provider.get_stats()["counters"].get(name, 0)
+ self.magicfolder = None
+
+ def _get_count(self, name, client=None):
+ counters = (client or self.get_client()).stats_provider.get_stats()["counters"]
+ return counters.get('magic_folder.%s' % (name,), 0)
+
+ def _createdb(self):
+ dbfile = abspath_expanduser_unicode(u"magicfolderdb.sqlite", base=self.basedir)
+ bdb = backupdb.get_backupdb(dbfile, create_version=(backupdb.SCHEMA_v3, 3))
+ self.failUnless(bdb, "unable to create backupdb from %r" % (dbfile,))
+ self.failUnlessEqual(bdb.VERSION, 3)
+ return bdb
+
+ def _restart_client(self, ign):
+ #print "_restart_client"
+ d = self.restart_client()
+ d.addCallback(self._wait_until_started)
+ return d
- def _test(self):
- self.uploader = None
+ def _wait_until_started(self, ign):
+ #print "_wait_until_started"
+ self.magicfolder = self.get_client().getServiceNamed('magic-folder')
+ return self.magicfolder.ready()
+
+ def test_db_basic(self):
+ fileutil.make_dirs(self.basedir)
+ self._createdb()
+
+ def test_db_persistence(self):
+ """Test that a file upload creates an entry in the database."""
+
+ fileutil.make_dirs(self.basedir)
+ db = self._createdb()
+
+ relpath1 = u"myFile1"
+ pathinfo = fileutil.PathInfo(isdir=False, isfile=True, islink=False,
+ exists=True, size=1, mtime=123, ctime=456)
+ db.did_upload_version('URI:LIT:1', relpath1, 0, pathinfo)
+
+ c = db.cursor
+ c.execute("SELECT size, mtime, ctime"
+ " FROM local_files"
+ " WHERE path=?",
+ (relpath1,))
+ row = c.fetchone()
+ self.failUnlessEqual(row, (pathinfo.size, pathinfo.mtime, pathinfo.ctime))
+
+ # Second test uses db.is_new_file instead of SQL query directly
+ # to confirm the previous upload entry in the db.
+ relpath2 = u"myFile2"
+ path2 = os.path.join(self.basedir, relpath2)
+ fileutil.write(path2, "meow\n")
+ pathinfo = fileutil.get_pathinfo(path2)
+ db.did_upload_version('URI:LIT:2', relpath2, 0, pathinfo)
+ self.failUnlessFalse(db.is_new_file(pathinfo, relpath2))
+
+ different_pathinfo = fileutil.PathInfo(isdir=False, isfile=True, islink=False,
+ exists=True, size=0, mtime=pathinfo.mtime, ctime=pathinfo.ctime)
+ self.failUnlessTrue(db.is_new_file(different_pathinfo, relpath2))
+
+ def test_magicfolder_start_service(self):
self.set_up_grid()
- self.local_dir = os.path.join(self.basedir, self.unicode_or_fallback(u"loc\u0101l_dir", u"local_dir"))
+
+ self.local_dir = abspath_expanduser_unicode(self.unicode_or_fallback(u"l\u00F8cal_dir", u"local_dir"),
+ base=self.basedir)
self.mkdir_nonascii(self.local_dir)
- self.client = self.g.clients[0]
- self.stats_provider = self.client.stats_provider
+ d = defer.succeed(None)
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.dirs_monitored'), 0))
- d = self.client.create_dirnode()
- def _made_upload_dir(n):
- self.failUnless(IDirectoryNode.providedBy(n))
- self.upload_dirnode = n
- self.upload_dircap = n.get_uri()
- self.uploader = DropUploader(self.client, self.upload_dircap, self.local_dir.encode('utf-8'),
- inotify=self.inotify)
- return self.uploader.startService()
- d.addCallback(_made_upload_dir)
+ d.addCallback(lambda ign: self.create_invite_join_magic_folder(u"Alice", self.local_dir))
+ d.addCallback(self._restart_client)
+
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.dirs_monitored'), 1))
+ d.addBoth(self.cleanup)
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.dirs_monitored'), 0))
+ return d
+
+ def test_move_tree(self):
+ self.set_up_grid()
+
+ self.local_dir = abspath_expanduser_unicode(self.unicode_or_fallback(u"l\u00F8cal_dir", u"local_dir"),
+ base=self.basedir)
+ self.mkdir_nonascii(self.local_dir)
+
+ empty_tree_name = self.unicode_or_fallback(u"empty_tr\u00EAe", u"empty_tree")
+ empty_tree_dir = abspath_expanduser_unicode(empty_tree_name, base=self.basedir)
+ new_empty_tree_dir = abspath_expanduser_unicode(empty_tree_name, base=self.local_dir)
+
+ small_tree_name = self.unicode_or_fallback(u"small_tr\u00EAe", u"empty_tree")
+ small_tree_dir = abspath_expanduser_unicode(small_tree_name, base=self.basedir)
+ new_small_tree_dir = abspath_expanduser_unicode(small_tree_name, base=self.local_dir)
+
+ d = self.create_invite_join_magic_folder(u"Alice", self.local_dir)
+ d.addCallback(self._restart_client)
+
+ def _check_move_empty_tree(res):
+ #print "_check_move_empty_tree"
+ self.mkdir_nonascii(empty_tree_dir)
+ d2 = self.magicfolder.uploader.set_hook('processed')
+ os.rename(empty_tree_dir, new_empty_tree_dir)
+ self.notify(to_filepath(new_empty_tree_dir), self.inotify.IN_MOVED_TO)
+ return d2
+ d.addCallback(_check_move_empty_tree)
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 1))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 1))
+
+ def _check_move_small_tree(res):
+ #print "_check_move_small_tree"
+ self.mkdir_nonascii(small_tree_dir)
+ fileutil.write(abspath_expanduser_unicode(u"what", base=small_tree_dir), "say when")
+ d2 = self.magicfolder.uploader.set_hook('processed', ignore_count=1)
+ os.rename(small_tree_dir, new_small_tree_dir)
+ self.notify(to_filepath(new_small_tree_dir), self.inotify.IN_MOVED_TO)
+ return d2
+ d.addCallback(_check_move_small_tree)
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 3))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded'), 1))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 2))
+
+ def _check_moved_tree_is_watched(res):
+ #print "_check_moved_tree_is_watched"
+ d2 = self.magicfolder.uploader.set_hook('processed')
+ fileutil.write(abspath_expanduser_unicode(u"another", base=new_small_tree_dir), "file")
+ self.notify(to_filepath(abspath_expanduser_unicode(u"another", base=new_small_tree_dir)), self.inotify.IN_CLOSE_WRITE)
+ return d2
+ d.addCallback(_check_moved_tree_is_watched)
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 4))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded'), 2))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 2))
+
+ # Files that are moved out of the upload directory should no longer be watched.
+ #def _move_dir_away(ign):
+ # os.rename(new_empty_tree_dir, empty_tree_dir)
+ # # Wuh? Why don't we get this event for the real test?
+ # #self.notify(to_filepath(new_empty_tree_dir), self.inotify.IN_MOVED_FROM)
+ #d.addCallback(_move_dir_away)
+ #def create_file(val):
+ # test_file = abspath_expanduser_unicode(u"what", base=empty_tree_dir)
+ # fileutil.write(test_file, "meow")
+ # #self.notify(...)
+ # return
+ #d.addCallback(create_file)
+ #d.addCallback(lambda ign: time.sleep(1)) # XXX ICK
+ #d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+ #d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 4))
+ #d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded'), 2))
+ #d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
+ #d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 2))
+
+ d.addBoth(self.cleanup)
+ return d
+
+ def test_persistence(self):
+ """
+ Perform an upload of a given file and then stop the client.
+ Start a new client and magic-folder service... and verify that the file is NOT uploaded
+ a second time. This test is meant to test the database persistence along with
+ the startup and shutdown code paths of the magic-folder service.
+ """
+ self.set_up_grid()
+ self.local_dir = abspath_expanduser_unicode(u"test_persistence", base=self.basedir)
+ self.mkdir_nonascii(self.local_dir)
+ self.collective_dircap = ""
+
+ d = defer.succeed(None)
+ d.addCallback(lambda ign: self.create_invite_join_magic_folder(u"Alice", self.local_dir))
+ d.addCallback(self._restart_client)
+
+ def create_test_file(filename):
+ d2 = self.magicfolder.uploader.set_hook('processed')
+ test_file = abspath_expanduser_unicode(filename, base=self.local_dir)
+ fileutil.write(test_file, "meow %s" % filename)
+ self.notify(to_filepath(test_file), self.inotify.IN_CLOSE_WRITE)
+ return d2
+ d.addCallback(lambda ign: create_test_file(u"what1"))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 1))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
+ d.addCallback(self.cleanup)
+
+ d.addCallback(self._restart_client)
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 1))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
+ d.addCallback(lambda ign: create_test_file(u"what2"))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 2))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
+ d.addBoth(self.cleanup)
+ return d
+
+ def test_magic_folder(self):
+ self.set_up_grid()
+ self.local_dir = os.path.join(self.basedir, self.unicode_or_fallback(u"loc\u0101l_dir", u"local_dir"))
+ self.mkdir_nonascii(self.local_dir)
+
+ d = self.create_invite_join_magic_folder(u"Alice\u0101", self.local_dir)
+ d.addCallback(self._restart_client)
# Write something short enough for a LIT file.
- d.addCallback(lambda ign: self._test_file(u"short", "test"))
+ d.addCallback(lambda ign: self._check_file(u"short", "test"))
# Write to the same file again with different data.
- d.addCallback(lambda ign: self._test_file(u"short", "different"))
+ d.addCallback(lambda ign: self._check_file(u"short", "different"))
# Test that temporary files are not uploaded.
- d.addCallback(lambda ign: self._test_file(u"tempfile", "test", temporary=True))
+ d.addCallback(lambda ign: self._check_file(u"tempfile", "test", temporary=True))
# Test that we tolerate creation of a subdirectory.
d.addCallback(lambda ign: os.mkdir(os.path.join(self.local_dir, u"directory")))
# Write something longer, and also try to test a Unicode name if the fs can represent it.
name_u = self.unicode_or_fallback(u"l\u00F8ng", u"long")
- d.addCallback(lambda ign: self._test_file(name_u, "test"*100))
+ d.addCallback(lambda ign: self._check_file(name_u, "test"*100))
# TODO: test that causes an upload failure.
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_failed'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
- # Prevent unclean reactor errors.
- def _cleanup(res):
- d = defer.succeed(None)
- if self.uploader is not None:
- d.addCallback(lambda ign: self.uploader.finish(for_tests=True))
- d.addCallback(lambda ign: res)
- return d
- d.addBoth(_cleanup)
+ d.addBoth(self.cleanup)
return d
- def _test_file(self, name_u, data, temporary=False):
- previously_uploaded = self._get_count('drop_upload.files_uploaded')
- previously_disappeared = self._get_count('drop_upload.files_disappeared')
-
- d = defer.Deferred()
+ def _check_file(self, name_u, data, temporary=False):
+ previously_uploaded = self._get_count('uploader.objects_succeeded')
+ previously_disappeared = self._get_count('uploader.objects_disappeared')
- # Note: this relies on the fact that we only get one IN_CLOSE_WRITE notification per file
- # (otherwise we would get a defer.AlreadyCalledError). Should we be relying on that?
- self.uploader.set_uploaded_callback(d.callback)
+ d = self.magicfolder.uploader.set_hook('processed')
path_u = abspath_expanduser_unicode(name_u, base=self.local_dir)
path = to_filepath(path_u)
f.close()
if temporary and sys.platform == "win32":
os.unlink(path_u)
+ self.notify(path, self.inotify.IN_DELETE)
fileutil.flush_volume(path_u)
- self.notify_close_write(path)
+ self.notify(path, self.inotify.IN_CLOSE_WRITE)
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
if temporary:
- d.addCallback(lambda ign: self.shouldFail(NoSuchChildError, 'temp file not uploaded', None,
- self.upload_dirnode.get, name_u))
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_disappeared'),
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_disappeared'),
previously_disappeared + 1))
else:
d.addCallback(lambda ign: self.upload_dirnode.get(name_u))
d.addCallback(download_to_data)
d.addCallback(lambda actual_data: self.failUnlessReallyEqual(actual_data, data))
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'),
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'),
previously_uploaded + 1))
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_queued'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
+ return d
+
+ def _check_version_in_dmd(self, magicfolder, relpath_u, expected_version):
+ encoded_name_u = magicpath.path2magic(relpath_u)
+ d = magicfolder.downloader._get_collective_latest_file(encoded_name_u)
+ def check_latest(result):
+ if result[0] is not None:
+ node, metadata = result
+ d.addCallback(lambda ign: self.failUnlessEqual(metadata['version'], expected_version))
+ d.addCallback(check_latest)
return d
+ def _check_version_in_local_db(self, magicfolder, relpath_u, expected_version):
+ version = magicfolder._db.get_local_file_version(relpath_u)
+ #print "_check_version_in_local_db: %r has version %s" % (relpath_u, version)
+ self.failUnlessEqual(version, expected_version)
+
+ def test_alice_bob(self):
+ d = self.setup_alice_and_bob()
+ def get_results(result):
+ # XXX are these used?
+ (self.alice_collective_dircap, self.alice_upload_dircap, self.alice_magicfolder,
+ self.bob_collective_dircap, self.bob_upload_dircap, self.bob_magicfolder) = result
+ #print "Alice magicfolderdb is at %r" % (self.alice_magicfolder._client.basedir)
+ #print "Bob magicfolderdb is at %r" % (self.bob_magicfolder._client.basedir)
+ d.addCallback(get_results)
+
+ def Alice_write_a_file(result):
+ print "Alice writes a file\n"
+ self.file_path = abspath_expanduser_unicode(u"file1", base=self.alice_magicfolder.uploader._local_path_u)
+ fileutil.write(self.file_path, "meow, meow meow. meow? meow meow! meow.")
+ self.magicfolder = self.alice_magicfolder
+ self.notify(to_filepath(self.file_path), self.inotify.IN_CLOSE_WRITE)
+
+ d.addCallback(Alice_write_a_file)
+
+ def Alice_wait_for_upload(result):
+ print "Alice waits for an upload\n"
+ d2 = self.alice_magicfolder.uploader.set_hook('processed')
+ return d2
+ d.addCallback(Alice_wait_for_upload)
+ d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 0))
+ d.addCallback(lambda ign: self._check_version_in_local_db(self.alice_magicfolder, u"file1", 0))
+
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded', client=self.alice_magicfolder._client), 1))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded', client=self.alice_magicfolder._client), 1))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued', client=self.alice_magicfolder._client), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created', client=self.alice_magicfolder._client), 0))
+
+ def Bob_wait_for_download(result):
+ print "Bob waits for a download\n"
+ d2 = self.bob_magicfolder.downloader.set_hook('processed')
+ return d2
+ d.addCallback(Bob_wait_for_download)
+ d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 0))
+ d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file1", 0)) # XXX prolly not needed
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_failed'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_downloaded', client=self.bob_magicfolder._client), 1))
+
+
+ # test deletion of file behavior
+ def Alice_delete_file(result):
+ print "Alice deletes the file!\n"
+ os.unlink(self.file_path)
+ self.notify(to_filepath(self.file_path), self.inotify.IN_DELETE)
+
+ return None
+ d.addCallback(Alice_delete_file)
+ d.addCallback(Alice_wait_for_upload)
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded', client=self.alice_magicfolder._client), 2))
+ d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 1))
+ d.addCallback(lambda ign: self._check_version_in_local_db(self.alice_magicfolder, u"file1", 1))
+
+ d.addCallback(Bob_wait_for_download)
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_failed'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_downloaded', client=self.bob_magicfolder._client), 2))
+ d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 1))
+ d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file1", 1))
+
+ def Alice_rewrite_file(result):
+ print "Alice rewrites file\n"
+ self.file_path = abspath_expanduser_unicode(u"file1", base=self.alice_magicfolder.uploader._local_path_u)
+ fileutil.write(self.file_path, "Alice suddenly sees the white rabbit running into the forest.")
+ self.magicfolder = self.alice_magicfolder
+ self.notify(to_filepath(self.file_path), self.inotify.IN_CLOSE_WRITE)
+
+ d.addCallback(Alice_rewrite_file)
+ d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 1))
+ d.addCallback(Alice_wait_for_upload)
+
+ d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 2))
+ d.addCallback(lambda ign: self._check_version_in_local_db(self.alice_magicfolder, u"file1", 2))
+
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded', client=self.alice_magicfolder._client), 3))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded', client=self.alice_magicfolder._client), 3))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued', client=self.alice_magicfolder._client), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created', client=self.alice_magicfolder._client), 0))
+
+ d.addCallback(Bob_wait_for_download)
+ d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file1", 2))
+ d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 2))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_failed'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_downloaded', client=self.bob_magicfolder._client), 3))
+
+ def cleanup_Alice_and_Bob(result):
+ print "cleanup alice bob test\n"
+ d = defer.succeed(None)
+ d.addCallback(lambda ign: self.alice_magicfolder.finish())
+ d.addCallback(lambda ign: self.bob_magicfolder.finish())
+ d.addCallback(lambda ign: result)
+ return d
+ d.addCallback(cleanup_Alice_and_Bob)
+ return d
class MockTest(MagicFolderTestMixin, unittest.TestCase):
"""This can run on any platform, and even if twisted.internet.inotify can't be imported."""
+ def setUp(self):
+ MagicFolderTestMixin.setUp(self)
+ self.inotify = fake_inotify
+ self.patch(magic_folder, 'get_inotify_module', lambda: self.inotify)
+
+ def notify(self, path, mask):
+ self.magicfolder.uploader._notifier.event(path, mask)
+
def test_errors(self):
self.set_up_grid()
client = self.g.clients[0]
d = client.create_dirnode()
- def _made_upload_dir(n):
+ def _check_errors(n):
self.failUnless(IDirectoryNode.providedBy(n))
upload_dircap = n.get_uri()
readonly_dircap = n.get_readonly_uri()
self.shouldFail(AssertionError, 'nonexistent local.directory', 'there is no directory',
- MagicFolder, client, upload_dircap, doesnotexist, inotify=fake_inotify)
+ MagicFolder, client, upload_dircap, '', doesnotexist, magicfolderdb)
self.shouldFail(AssertionError, 'non-directory local.directory', 'is not a directory',
- MagicFolder, client, upload_dircap, not_a_dir, inotify=fake_inotify)
+ MagicFolder, client, upload_dircap, '', not_a_dir, magicfolderdb)
self.shouldFail(AssertionError, 'bad upload.dircap', 'does not refer to a directory',
- MagicFolder, client, 'bad', errors_dir, inotify=fake_inotify)
+ MagicFolder, client, 'bad', '', errors_dir, magicfolderdb)
self.shouldFail(AssertionError, 'non-directory upload.dircap', 'does not refer to a directory',
- MagicFolder, client, 'URI:LIT:foo', errors_dir, inotify=fake_inotify)
+ MagicFolder, client, 'URI:LIT:foo', '', errors_dir, magicfolderdb)
self.shouldFail(AssertionError, 'readonly upload.dircap', 'is not a writecap to a directory',
- MagicFolder, client, readonly_dircap, errors_dir, inotify=fake_inotify)
- d.addCallback(_made_upload_dir)
+ MagicFolder, client, readonly_dircap, '', errors_dir, magicfolderdb,)
+ self.shouldFail(AssertionError, 'collective dircap',
+ "The URI in 'private/collective_dircap' is not a readonly cap to a directory.",
+ MagicFolder, client, upload_dircap, upload_dircap, errors_dir, magicfolderdb)
+
+ def _not_implemented():
+ raise NotImplementedError("blah")
+ self.patch(magic_folder, 'get_inotify_module', _not_implemented)
+ self.shouldFail(NotImplementedError, 'unsupported', 'blah',
+ MagicFolder, client, upload_dircap, '', errors_dir, magicfolderdb)
+ d.addCallback(_check_errors)
return d
- def test_drop_upload(self):
- self.inotify = fake_inotify
- self.basedir = "drop_upload.MockTest.test_drop_upload"
- return self._test()
+ def test_write_downloaded_file(self):
+ workdir = u"cli/MagicFolder/write-downloaded-file"
+ local_file = fileutil.abspath_expanduser_unicode(os.path.join(workdir, "foobar"))
+
+ # create a file with name "foobar" with content "foo"
+ # write downloaded file content "bar" into "foobar" with is_conflict = False
+ fileutil.make_dirs(workdir)
+ fileutil.write(local_file, "foo")
+
+ # if is_conflict is False, then the .conflict file shouldn't exist.
+ Downloader._write_downloaded_file(local_file, "bar", False, None)
+ conflicted_path = local_file + u".conflict"
+ self.failIf(os.path.exists(conflicted_path))
- def notify_close_write(self, path):
- self.uploader._notifier.event(path, self.inotify.IN_CLOSE_WRITE)
+ # At this point, the backup file should exist with content "foo"
+ backup_path = local_file + u".backup"
+ self.failUnless(os.path.exists(backup_path))
+ self.failUnlessEqual(fileutil.read(backup_path), "foo")
+
+ # .tmp file shouldn't exist
+ self.failIf(os.path.exists(local_file + u".tmp"))
+
+ # .. and the original file should have the new content
+ self.failUnlessEqual(fileutil.read(local_file), "bar")
+
+ # now a test for conflicted case
+ Downloader._write_downloaded_file(local_file, "bar", True, None)
+ self.failUnless(os.path.exists(conflicted_path))
+
+ # .tmp file shouldn't exist
+ self.failIf(os.path.exists(local_file + u".tmp"))
class RealTest(MagicFolderTestMixin, unittest.TestCase):
"""This is skipped unless both Twisted and the platform support inotify."""
- def test_drop_upload(self):
- self.inotify = None # use the appropriate inotify for the platform
- self.basedir = "drop_upload.RealTest.test_drop_upload"
- return self._test()
+ def setUp(self):
+ MagicFolderTestMixin.setUp(self)
+ self.inotify = magic_folder.get_inotify_module()
- def notify_close_write(self, path):
- # Writing to the file causes the notification.
+ def notify(self, path, mask):
+ # Writing to the filesystem causes the notification.
pass
-if sys.platform != "win32" and not runtime.platform.supportsINotify():
+try:
+ magic_folder.get_inotify_module()
+except NotImplementedError:
RealTest.skip = "Magic Folder support can only be tested for-real on an OS that supports inotify or equivalent."
--- /dev/null
+
+from twisted.trial import unittest
+
+from allmydata import magicpath
+
+
+class MagicPath(unittest.TestCase):
+ tests = {
+ u"Documents/work/critical-project/qed.txt": u"Documents@_work@_critical-project@_qed.txt",
+ u"Documents/emails/bunnyfufu@hoppingforest.net": u"Documents@_emails@_bunnyfufu@@hoppingforest.net",
+ u"foo/@/bar": u"foo@_@@@_bar",
+ }
+
+ def test_path2magic(self):
+ for test, expected in self.tests.items():
+ self.failUnlessEqual(magicpath.path2magic(test), expected)
+
+ def test_magic2path(self):
+ for expected, test in self.tests.items():
+ self.failUnlessEqual(magicpath.magic2path(test), expected)
+
+ def test_should_ignore(self):
+ self.failUnlessEqual(magicpath.should_ignore_file(u".bashrc"), True)
+ self.failUnlessEqual(magicpath.should_ignore_file(u"bashrc."), False)
+ self.failUnlessEqual(magicpath.should_ignore_file(u"forest/tree/branch/.bashrc"), True)
+ self.failUnlessEqual(magicpath.should_ignore_file(u"forest/tree/.branch/bashrc"), True)
+ self.failUnlessEqual(magicpath.should_ignore_file(u"forest/.tree/branch/bashrc"), True)
+ self.failUnlessEqual(magicpath.should_ignore_file(u"forest/tree/branch/bashrc"), False)
self.failIf(os.path.exists(fn))
self.failUnless(os.path.exists(fn2))
+ def test_rename_no_overwrite(self):
+ workdir = fileutil.abspath_expanduser_unicode(u"test_rename_no_overwrite")
+ fileutil.make_dirs(workdir)
+
+ source_path = os.path.join(workdir, "source")
+ dest_path = os.path.join(workdir, "dest")
+
+ # when neither file exists
+ self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
+
+ # when only dest exists
+ fileutil.write(dest_path, "dest")
+ self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
+ self.failUnlessEqual(fileutil.read(dest_path), "dest")
+
+ # when both exist
+ fileutil.write(source_path, "source")
+ self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
+ self.failUnlessEqual(fileutil.read(source_path), "source")
+ self.failUnlessEqual(fileutil.read(dest_path), "dest")
+
+ # when only source exists
+ os.remove(dest_path)
+ fileutil.rename_no_overwrite(source_path, dest_path)
+ self.failUnlessEqual(fileutil.read(dest_path), "source")
+ self.failIf(os.path.exists(source_path))
+
+ def test_replace_file(self):
+ workdir = fileutil.abspath_expanduser_unicode(u"test_replace_file")
+ fileutil.make_dirs(workdir)
+
+ backup_path = os.path.join(workdir, "backup")
+ replaced_path = os.path.join(workdir, "replaced")
+ replacement_path = os.path.join(workdir, "replacement")
+
+ # when none of the files exist
+ self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path, backup_path)
+
+ # when only replaced exists
+ fileutil.write(replaced_path, "foo")
+ self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path, backup_path)
+ self.failUnlessEqual(fileutil.read(replaced_path), "foo")
+
+ # when both replaced and replacement exist, but not backup
+ fileutil.write(replacement_path, "bar")
+ fileutil.replace_file(replaced_path, replacement_path, backup_path)
+ self.failUnlessEqual(fileutil.read(backup_path), "foo")
+ self.failUnlessEqual(fileutil.read(replaced_path), "bar")
+ self.failIf(os.path.exists(replacement_path))
+
+ # when only replacement exists
+ os.remove(backup_path)
+ os.remove(replaced_path)
+ fileutil.write(replacement_path, "bar")
+ fileutil.replace_file(replaced_path, replacement_path, backup_path)
+ self.failUnlessEqual(fileutil.read(replaced_path), "bar")
+ self.failIf(os.path.exists(replacement_path))
+ self.failIf(os.path.exists(backup_path))
+
+ # when replaced, replacement and backup all exist
+ fileutil.write(replaced_path, "foo")
+ fileutil.write(replacement_path, "bar")
+ fileutil.write(backup_path, "bak")
+ fileutil.replace_file(replaced_path, replacement_path, backup_path)
+ self.failUnlessEqual(fileutil.read(backup_path), "foo")
+ self.failUnlessEqual(fileutil.read(replaced_path), "bar")
+ self.failIf(os.path.exists(replacement_path))
+
def test_du(self):
basedir = "util/FileUtil/test_du"
fileutil.make_dirs(basedir)
disk = fileutil.get_disk_stats('.', 2**128)
self.failUnlessEqual(disk['avail'], 0)
+ def test_get_pathinfo(self):
+ basedir = "util/FileUtil/test_get_pathinfo"
+ fileutil.make_dirs(basedir)
+
+ # create a directory
+ self.mkdir(basedir, "a")
+ dirinfo = fileutil.get_pathinfo(basedir)
+ self.failUnlessTrue(dirinfo.isdir)
+ self.failUnlessTrue(dirinfo.exists)
+ self.failUnlessFalse(dirinfo.isfile)
+ self.failUnlessFalse(dirinfo.islink)
+
+ # create a file under the directory
+ f = os.path.join(basedir, "a", "1.txt")
+ self.touch(basedir, "a/1.txt", data="a"*10)
+ fileinfo = fileutil.get_pathinfo(f)
+ self.failUnlessTrue(fileinfo.isfile)
+ self.failUnlessTrue(fileinfo.exists)
+ self.failUnlessFalse(fileinfo.isdir)
+ self.failUnlessFalse(fileinfo.islink)
+ self.failUnlessEqual(fileinfo.size, 10)
+
+ # create a symlink under the directory a pointing to 1.txt
+ slname = os.path.join(basedir, "a", "linkto1.txt")
+ os.symlink(f, slname)
+ symlinkinfo = fileutil.get_pathinfo(slname)
+ self.failUnlessTrue(symlinkinfo.islink)
+ self.failUnlessTrue(symlinkinfo.exists)
+ self.failUnlessFalse(symlinkinfo.isfile)
+ self.failUnlessFalse(symlinkinfo.isdir)
+
+ # path at which nothing exists
+ dnename = os.path.join(basedir, "a", "doesnotexist")
+ now = time.time()
+ dneinfo = fileutil.get_pathinfo(dnename, now=now)
+ self.failUnlessFalse(dneinfo.exists)
+ self.failUnlessFalse(dneinfo.isfile)
+ self.failUnlessFalse(dneinfo.isdir)
+ self.failUnlessFalse(dneinfo.islink)
+ self.failUnlessEqual(dneinfo.size, None)
+ self.failUnlessEqual(dneinfo.mtime, now)
+ self.failUnlessEqual(dneinfo.ctime, now)
+
+
class PollMixinTests(unittest.TestCase):
def setUp(self):
self.pm = pollmixin.PollMixin()
from twisted.internet import defer, reactor
from allmydata.util import log
+from allmydata.util.assertutil import _assert
from allmydata.util.pollmixin import PollMixin
I am a helper mixin that maintains a collection of named hooks, primarily
for use in tests. Each hook is set to an unfired Deferred using 'set_hook',
and can then be fired exactly once at the appropriate time by '_call_hook'.
+ If 'ignore_count' is given, that number of calls to '_call_hook' will be
+ ignored before firing the hook.
I assume a '_hooks' attribute that should set by the class constructor to
a dict mapping each valid hook name to None.
"""
- def set_hook(self, name, d=None):
+ def set_hook(self, name, d=None, ignore_count=0):
"""
Called by the hook observer (e.g. by a test).
If d is not given, an unfired Deferred is created and returned.
The hook must not already be set.
"""
+ self._log("set_hook %r, ignore_count=%r" % (name, ignore_count))
if d is None:
d = defer.Deferred()
- assert self._hooks[name] is None, self._hooks[name]
- assert isinstance(d, defer.Deferred), d
- self._hooks[name] = d
+ _assert(ignore_count >= 0, ignore_count=ignore_count)
+ _assert(name in self._hooks, name=name)
+ _assert(self._hooks[name] is None, name=name, hook=self._hooks[name])
+ _assert(isinstance(d, defer.Deferred), d=d)
+
+ self._hooks[name] = (d, ignore_count)
return d
def _call_hook(self, res, name):
"""
- Called to trigger the hook, with argument 'res'. This is a no-op if the
- hook is unset. Otherwise, the hook will be unset, and then its Deferred
- will be fired synchronously.
+ Called to trigger the hook, with argument 'res'. This is a no-op if
+ the hook is unset. If the hook's ignore_count is positive, it will be
+ decremented; if it was already zero, the hook will be unset, and then
+ its Deferred will be fired synchronously.
The expected usage is "deferred.addBoth(self._call_hook, 'hookname')".
This ensures that if 'res' is a failure, the hook will be errbacked,
'res' is returned so that the current result or failure will be passed
through.
"""
- d = self._hooks[name]
- if d is None:
+ hook = self._hooks[name]
+ if hook is None:
return defer.succeed(None)
- self._hooks[name] = None
- _with_log(d.callback, res)
+
+ (d, ignore_count) = hook
+ self._log("call_hook %r, ignore_count=%r" % (name, ignore_count))
+ if ignore_count > 0:
+ self._hooks[name] = (d, ignore_count - 1)
+ else:
+ self._hooks[name] = None
+ _with_log(d.callback, res)
return res
+ def _log(self, msg):
+ log.msg(msg, level=log.NOISY)
+
def async_iterate(process, iterable, *extra_args, **kwargs):
"""
import sys, os, re, locale
from types import NoneType
-from allmydata.util.assertutil import precondition
+from allmydata.util.assertutil import precondition, _assert
from twisted.python import usage
from twisted.python.filepath import FilePath
from allmydata.util import log
is_unicode_platform = sys.platform in ["win32", "darwin"]
- use_unicode_filepath = sys.platform == "win32" or hasattr(FilePath, '_asTextPath')
+ # Despite the Unicode-mode FilePath support added to Twisted in
+ # <https://twistedmatrix.com/trac/ticket/7805>, we can't yet use
+ # Unicode-mode FilePaths with INotify on non-Windows platforms
+ # due to <https://twistedmatrix.com/trac/ticket/7928>.
+ use_unicode_filepath = sys.platform == "win32"
_reload()
return quote_output(path, quotemarks=quotemarks, quote_newlines=True)
+def quote_filepath(path, quotemarks=True):
+ return quote_local_unicode_path(unicode_from_filepath(path), quotemarks=quotemarks)
+
+def extend_filepath(fp, segments):
+ # We cannot use FilePath.preauthChild, because
+ # * it has the security flaw described in <https://twistedmatrix.com/trac/ticket/6527>;
+ # * it may return a FilePath in the wrong mode.
+
+ for segment in segments:
+ fp = fp.child(segment)
+
+ if isinstance(fp.path, unicode) and not use_unicode_filepath:
+ return FilePath(fp.path.encode(filesystem_encoding))
+ else:
+ return fp
+
def to_filepath(path):
precondition(isinstance(path, basestring), path=path)
return FilePath(path)
+def _decode(s):
+ precondition(isinstance(s, basestring), s=s)
+
+ if isinstance(s, bytes):
+ return s.decode(filesystem_encoding)
+ else:
+ return s
+
def unicode_from_filepath(fp):
precondition(isinstance(fp, FilePath), fp=fp)
+ return _decode(fp.path)
- path = fp.path
- if isinstance(path, bytes):
- path = path.decode(filesystem_encoding)
-
- return path
+def unicode_segments_from(base_fp, ancestor_fp):
+ precondition(isinstance(base_fp, FilePath), base_fp=base_fp)
+ precondition(isinstance(ancestor_fp, FilePath), ancestor_fp=ancestor_fp)
+ if hasattr(FilePath, 'asTextMode'):
+ return base_fp.asTextMode().segmentsFrom(ancestor_fp.asTextMode())
+ else:
+ bpt, apt = (type(base_fp.path), type(ancestor_fp.path))
+ _assert(bpt == apt, bpt=bpt, apt=apt)
+ return map(_decode, base_fp.segmentsFrom(ancestor_fp))
def unicode_platform():
"""
return os.listdir(path)
else:
return listdir_unicode_fallback(path)
+
+def listdir_filepath(fp):
+ return listdir_unicode(unicode_from_filepath(fp))
"""
import sys, exceptions, os, stat, tempfile, time, binascii
+from collections import namedtuple
+from errno import ENOENT
if sys.platform == "win32":
from ctypes import WINFUNCTYPE, WinError, windll, POINTER, byref, c_ulonglong, \
if sys.platform == "win32":
- from ctypes import WINFUNCTYPE, windll, WinError
- from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPVOID
+ from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPVOID, WinError, get_last_error
# <http://msdn.microsoft.com/en-us/library/aa363858%28v=vs.85%29.aspx>
CreateFileW = WINFUNCTYPE(HANDLE, LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE) \
def flush_volume(path):
# use sync()?
pass
+
+
+class ConflictError(Exception):
+ pass
+
+class UnableToUnlinkReplacementError(Exception):
+ pass
+
+def reraise(wrapper):
+ _, exc, tb = sys.exc_info()
+ wrapper_exc = wrapper("%s: %s" % (exc.__class__.__name__, exc))
+ raise wrapper_exc.__class__, wrapper_exc, tb
+
+if sys.platform == "win32":
+ from ctypes import WINFUNCTYPE, windll, WinError, get_last_error
+ from ctypes.wintypes import BOOL, DWORD, LPCWSTR, LPVOID
+
+ # <https://msdn.microsoft.com/en-us/library/windows/desktop/aa365512%28v=vs.85%29.aspx>
+ ReplaceFileW = WINFUNCTYPE(
+ BOOL,
+ LPCWSTR, LPCWSTR, LPCWSTR, DWORD, LPVOID, LPVOID,
+ use_last_error=True
+ )(("ReplaceFileW", windll.kernel32))
+
+ REPLACEFILE_IGNORE_MERGE_ERRORS = 0x00000002
+
+ def rename_no_overwrite(source_path, dest_path):
+ os.rename(source_path, dest_path)
+
+ def replace_file(replaced_path, replacement_path, backup_path):
+ precondition_abspath(replaced_path)
+ precondition_abspath(replacement_path)
+ precondition_abspath(backup_path)
+
+ r = ReplaceFileW(replaced_path, replacement_path, backup_path,
+ REPLACEFILE_IGNORE_MERGE_ERRORS, None, None)
+ if r == 0:
+ # The UnableToUnlinkReplacementError case does not happen on Windows;
+ # all errors should be treated as signalling a conflict.
+ err = get_last_error()
+ raise ConflictError("WinError: %s" % (WinError(err)))
+else:
+ def rename_no_overwrite(source_path, dest_path):
+ # link will fail with EEXIST if there is already something at dest_path.
+ os.link(source_path, dest_path)
+ try:
+ os.unlink(source_path)
+ except EnvironmentError:
+ reraise(UnableToUnlinkReplacementError)
+
+ def replace_file(replaced_path, replacement_path, backup_path):
+ precondition_abspath(replaced_path)
+ precondition_abspath(replacement_path)
+ precondition_abspath(backup_path)
+
+ if not os.path.exists(replacement_path):
+ raise ConflictError("Replacement file not found: %r" % (replacement_path,))
+
+ try:
+ os.rename(replaced_path, backup_path)
+ except OSError as e:
+ if e.errno != ENOENT:
+ raise
+ try:
+ rename_no_overwrite(replacement_path, replaced_path)
+ except EnvironmentError:
+ reraise(ConflictError)
+
+PathInfo = namedtuple('PathInfo', 'isdir isfile islink exists size mtime ctime')
+
+def get_pathinfo(path_u, now=None):
+ try:
+ statinfo = os.lstat(path_u)
+ mode = statinfo.st_mode
+ return PathInfo(isdir =stat.S_ISDIR(mode),
+ isfile=stat.S_ISREG(mode),
+ islink=stat.S_ISLNK(mode),
+ exists=True,
+ size =statinfo.st_size,
+ mtime =statinfo.st_mtime,
+ ctime =statinfo.st_ctime,
+ )
+ except OSError as e:
+ if e.errno == ENOENT:
+ if now is None:
+ now = time.time()
+ return PathInfo(isdir =False,
+ isfile=False,
+ islink=False,
+ exists=False,
+ size =None,
+ mtime =now,
+ ctime =now,
+ )
+ raise