-import sys, os, stat
+import sys, os
import os.path
from collections import deque
import time
def __init__(self, client, local_path_u, db, name):
self._client = client
self._local_path_u = local_path_u
- self._local_path = to_filepath(local_path_u)
+ self._local_filepath = to_filepath(local_path_u)
self._db = db
self._name = name
self._hooks = {'processed': None, 'started': None}
self.started_d = self.set_hook('started')
- if not self._local_path.exists():
+ if not self._local_filepath.exists():
raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
"but there is no directory at that location."
% quote_local_unicode_path(self._local_path_u))
- if not self._local_path.isdir():
+ if not self._local_filepath.isdir():
raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
"but the thing at that location is not a directory."
% quote_local_unicode_path(self._local_path_u))
self._stopped = False
self._turn_delay = 0
+ def _get_abspath(self, relpath_u):
+ return unicode_from_filepath(self._local_filepath.preauthChild(relpath_u))
+
+ def _get_relpath(self, filepath):
+ return u"/".join(filepath.segmentsFrom(self._local_filepath))
+
def _count(self, counter_name, delta=1):
self._client.stats_provider.count('magic_folder.%s.%s' % (self._name, counter_name), delta)
#print s
#open("events", "ab+").write(msg)
- def _append_to_deque(self, path):
- if path in self._pending:
+ def _append_to_deque(self, relpath_u):
+ print "_append_to_deque(%r)" % (relpath_u,)
+ if relpath_u in self._pending:
return
- self._deque.append(path)
- self._pending.add(path)
+ self._deque.append(relpath_u)
+ self._pending.add(relpath_u)
self._count('objects_queued')
if self.is_ready:
reactor.callLater(0, self._turn_deque)
if self._stopped:
return
try:
- item = self._deque.pop()
+ relpath_u = self._deque.pop()
except IndexError:
self._log("deque is now empty")
self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
else:
- self._lazy_tail.addCallback(lambda ign: self._process(item))
+ self._lazy_tail.addCallback(lambda ign: self._process(relpath_u))
self._lazy_tail.addBoth(self._call_hook, 'processed')
self._lazy_tail.addErrback(log.err)
self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._turn_delay, self._turn_deque))
| self._inotify.IN_ONLYDIR
| IN_EXCL_UNLINK
)
- self._notifier.watch(self._local_path, mask=self.mask, callbacks=[self._notify],
+ self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
recursive=True)
def start_monitoring(self):
def start_scanning(self):
self._log("start_scanning")
self.is_ready = True
- all_files = self._db.get_all_files()
- d = self._scan(self._local_path_u)
- self._turn_deque()
+ self._pending = self._db.get_all_relpaths()
+ print "all_files %r" % (self._pending)
+ d = self._scan(u"")
+ def _add_pending(ign):
+ # This adds all of the files that were in the db but not already processed
+ # (normally because they have been deleted on disk).
+ print "adding %r" % (self._pending)
+ self._deque.extend(self._pending)
+ d.addCallback(_add_pending)
+ d.addCallback(lambda ign: self._turn_deque())
return d
- def _scan(self, local_path_u): # XXX should this take a FilePath?
- self._log("scan %r" % (local_path_u))
- if not os.path.isdir(local_path_u):
- raise AssertionError("Programmer error: _scan() must be passed a directory path.")
- quoted_path = quote_local_unicode_path(local_path_u)
+ def _scan(self, reldir_u):
+ self._log("scan %r" % (reldir_u,))
+ abspath_u = self._get_abspath(reldir_u)
try:
- children = listdir_unicode(local_path_u)
+ children = listdir_unicode(abspath_u)
except EnvironmentError:
- raise(Exception("WARNING: magic folder: permission denied on directory %s" % (quoted_path,)))
+ raise Exception("WARNING: magic folder: permission denied on directory %s"
+ % quote_local_unicode_path(abspath_u))
except FilenameEncodingError:
- raise(Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error" % (quoted_path,)))
+ raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
+ % quote_local_unicode_path(abspath_u))
d = defer.succeed(None)
for child in children:
assert isinstance(child, unicode), child
- d.addCallback(lambda ign, child=child: os.path.join(local_path_u, child))
+ d.addCallback(lambda ign, child=child: os.path.join(reldir_u, child))
d.addCallback(self._process_child)
d.addErrback(log.err)
def _notify(self, opaque, path, events_mask):
self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
- path_u = unicode_from_filepath(path)
- self._append_to_deque(path_u)
+ relpath_u = self._get_relpath(path)
+ self._append_to_deque(relpath_u)
def _when_queue_is_empty(self):
return defer.succeed(None)
- def _process_child(self, path_u):
- precondition(isinstance(path_u, unicode), path_u)
+ def _process_child(self, relpath_u):
+ precondition(isinstance(relpath_u, unicode), relpath_u)
- pathinfo = get_pathinfo(path_u)
+ abspath_u = self._get_abspath(relpath_u)
+ pathinfo = get_pathinfo(abspath_u)
if pathinfo.islink:
- self.warn("WARNING: cannot backup symlink %s" % quote_local_unicode_path(path_u))
+ self.warn("WARNING: cannot backup symlink %s" % quote_local_unicode_path(abspath_u))
return None
elif pathinfo.isdir:
# process directories unconditionally
- self._append_to_deque(path_u)
+ self._append_to_deque(relpath_u)
# recurse on the child directory
- return self._scan(path_u)
+ return self._scan(relpath_u)
elif pathinfo.isfile:
- file_version = self._db.get_local_file_version(path_u)
+ file_version = self._db.get_local_file_version(relpath_u)
if file_version is None:
# XXX upload if we didn't record our version in magicfolder db?
- self._append_to_deque(path_u)
+ self._append_to_deque(relpath_u)
return None
else:
- d2 = self._get_collective_latest_file(path_u)
+ d2 = self._get_collective_latest_file(relpath_u)
def _got_latest_file((file_node, metadata)):
collective_version = metadata['version']
if collective_version is None:
return None
if file_version > collective_version:
- self._append_to_upload_deque(path_u)
+ self._append_to_upload_deque(relpath_u)
elif file_version < collective_version: # FIXME Daira thinks this is wrong
# if a collective version of the file is newer than ours
# we must download it and unlink the old file from our upload dirnode
- self._append_to_download_deque(path_u)
+ self._append_to_download_deque(relpath_u)
# XXX where should we save the returned deferred?
- return self._upload_dirnode.delete(path_u, must_be_file=True)
+ return self._upload_dirnode.delete(relpath_u, must_be_file=True)
else:
# XXX same version. do nothing.
pass
d2.addCallback(_got_latest_file)
return d2
else:
- self.warn("WARNING: cannot backup special file %s" % quote_local_unicode_path(path_u))
+ self.warn("WARNING: cannot backup special file %s" % quote_local_unicode_path(abspath_u))
return None
- def _process(self, path_u):
- precondition(isinstance(path_u, unicode), path_u)
+ def _process(self, relpath_u):
+ precondition(isinstance(relpath_u, unicode), relpath_u)
d = defer.succeed(None)
def _maybe_upload(val):
- pathinfo = get_pathinfo(path_u)
+ abspath_u = self._get_abspath(relpath_u)
+ pathinfo = get_pathinfo(abspath_u)
- self._pending.remove(path_u) # FIXME make _upload_pending hold relative paths
- relpath_u = os.path.relpath(path_u, self._local_path_u)
+ self._pending.remove(relpath_u)
encoded_name_u = magicpath.path2magic(relpath_u)
if not pathinfo.exists:
- self._log("drop-upload: notified object %r disappeared "
- "(this is normal for temporary objects)" % (path_u,))
+ self._log("notified object %s disappeared (this is normal)" % quote_local_unicode_path(abspath_u))
self._count('objects_disappeared')
d2 = defer.succeed(None)
if self._db.check_file_db_exists(relpath_u):
d2.addCallback(lambda x: Exception("file does not exist")) # FIXME wrong
return d2
elif pathinfo.islink:
- self.warn("WARNING: cannot upload symlink %s" % quote_local_unicode_path(path_u))
+ self.warn("WARNING: cannot upload symlink %s" % quote_local_unicode_path(abspath_u))
return None
elif pathinfo.isdir:
- self._notifier.watch(to_filepath(path_u), mask=self.mask, callbacks=[self._notify], recursive=True)
+ self._notifier.watch(to_filepath(abspath_u), mask=self.mask, callbacks=[self._notify], recursive=True)
uploadable = Data("", self._client.convergence)
encoded_name_u += u"@_"
upload_d = self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":0}, overwrite=True)
def _succeeded(ign):
- self._log("created subdirectory %r" % (path_u,))
+ self._log("created subdirectory %r" % (relpath_u,))
self._count('directories_created')
def _failed(f):
- self._log("failed to create subdirectory %r" % (path_u,))
+ self._log("failed to create subdirectory %r" % (relpath_u,))
return f
upload_d.addCallbacks(_succeeded, _failed)
- upload_d.addCallback(lambda ign: self._scan(path_u))
+ upload_d.addCallback(lambda ign: self._scan(relpath_u))
return upload_d
elif pathinfo.isfile:
version = self._db.get_local_file_version(relpath_u)
if version is None:
version = 0
- else:
- if self._db.is_new_file_time(os.path.join(self._local_path_u, relpath_u), relpath_u):
- version += 1
+ elif self._db.is_new_file_time(abspath_u, relpath_u):
+ version += 1
- uploadable = FileName(path_u, self._client.convergence)
+ uploadable = FileName(abspath_u, self._client.convergence)
d2 = self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":version}, overwrite=True)
def add_db_entry(filenode):
filecap = filenode.get_uri()
d2.addCallback(add_db_entry)
return d2
else:
- self.warn("WARNING: cannot process special file %s" % quote_local_unicode_path(path_u))
+ self.warn("WARNING: cannot process special file %s" % quote_local_unicode_path(abspath_u))
return None
d.addCallback(_maybe_upload)
def _failed(f):
self._count('objects_queued', -1)
self._count('objects_failed')
- self._log("%r while processing %r" % (f, path_u))
+ self._log("%r while processing %r" % (f, relpath_u))
return f
d.addCallbacks(_succeeded, _failed)
return d
def start_scanning(self):
self._log("\nstart_scanning")
- files = self._db.get_all_files()
+ files = self._db.get_all_relpaths()
self._log("all files %s" % files)
d = self._scan_remote_collective()
v = self._db.get_local_file_version(relpath_u)
return (v is None or v < remote_version)
- def _get_local_latest(self, path_u):
- """_get_local_latest takes a unicode path string checks to see if this file object
+ def _get_local_latest(self, relpath_u):
+ """
+ _get_local_latest takes a unicode path string checks to see if this file object
exists in our magic-folder db; if not then return None
else check for an entry in our magic-folder db and return the version number.
"""
- if not os.path.exists(os.path.join(self._local_path_u,path_u)):
+ abspath_u = self._get_abspath(relpath_u)
+ if not os.path.exists(abspath_u):
return None
- return self._db.get_local_file_version(path_u)
+ return self._db.get_local_file_version(relpath_u)
def _get_collective_latest_file(self, filename):
- """_get_collective_latest_file takes a file path pointing to a file managed by
+ """
+ _get_collective_latest_file takes a file path pointing to a file managed by
magic-folder and returns a deferred that fires with the two tuple containing a
file node and metadata for the latest version of the file located in the
magic-folder collective directory.
def _filter_scan_batch(self, result):
extension = [] # consider whether this should be a dict
- for name in self._download_scan_batch.keys():
- if name in self._pending:
+ for relpath_u in self._download_scan_batch.keys():
+ if relpath_u in self._pending:
continue
- file_node, metadata = max(self._download_scan_batch[name], key=lambda x: x[1]['version'])
- if self._should_download(name, metadata['version']):
- extension += [(name, file_node, metadata)]
+ file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
+ if self._should_download(relpath_u, metadata['version']):
+ extension += [(relpath_u, file_node, metadata)]
return extension
def _when_queue_is_empty(self):
return d
def _process(self, item):
- (name, file_node, metadata) = item
+ (relpath_u, file_node, metadata) = item
d = file_node.download_best_version()
def succeeded(res):
+ abspath_u = self._get_abspath(relpath_u)
d2 = defer.succeed(res)
- absname = fileutil.abspath_expanduser_unicode(name, base=self._local_path_u)
- d2.addCallback(lambda result: self._write_downloaded_file(absname, result, is_conflict=False))
- def do_update_db(full_path):
+ d2.addCallback(lambda result: self._write_downloaded_file(abspath_u, result, is_conflict=False))
+ def do_update_db(written_abspath_u):
filecap = file_node.get_uri()
- try:
- s = os.stat(full_path)
- except:
- raise(Exception("wtf downloaded file %s disappeared" % full_path))
- size = s[stat.ST_SIZE]
- ctime = s[stat.ST_CTIME]
- mtime = s[stat.ST_MTIME]
- self._db.did_upload_file(filecap, name, metadata['version'], mtime, ctime, size)
+ pathinfo = get_pathinfo(written_abspath_u)
+ if not pathinfo.exists:
+ raise Exception("downloaded file %s disappeared" % quote_local_unicode_path(written_abspath_u))
+ self._db.did_upload_file(filecap, relpath_u, metadata['version'],
+ pathinfo.mtime, pathinfo.ctime, pathinfo.size)
d2.addCallback(do_update_db)
# XXX handle failure here with addErrback...
self._count('objects_downloaded')
return f
d.addCallbacks(succeeded, failed)
def remove_from_pending(res):
- self._pending.remove(name)
+ self._pending.remove(relpath_u)
return res
d.addBoth(remove_from_pending)
return d
FUDGE_SECONDS = 10.0
@classmethod
- def _write_downloaded_file(cls, path, file_contents, is_conflict=False, now=None):
+ def _write_downloaded_file(cls, abspath_u, file_contents, is_conflict=False, now=None):
# 1. Write a temporary file, say .foo.tmp.
# 2. is_conflict determines whether this is an overwrite or a conflict.
# 3. Set the mtime of the replacement file to be T seconds before the
#
# Returns the path of the destination file.
- precondition_abspath(path)
- replacement_path = path + u".tmp" # FIXME more unique
- backup_path = path + u".backup"
+ precondition_abspath(abspath_u)
+ replacement_path_u = abspath_u + u".tmp" # FIXME more unique
+ backup_path_u = abspath_u + u".backup"
if now is None:
now = time.time()
- fileutil.write(replacement_path, file_contents)
- os.utime(replacement_path, (now, now - cls.FUDGE_SECONDS))
+ fileutil.write(replacement_path_u, file_contents)
+ os.utime(replacement_path_u, (now, now - cls.FUDGE_SECONDS))
if is_conflict:
- return cls._rename_conflicted_file(path, replacement_path)
+ return cls._rename_conflicted_file(abspath_u, replacement_path_u)
else:
try:
- fileutil.replace_file(path, replacement_path, backup_path)
- return path
+ fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
+ return abspath_u
except fileutil.ConflictError:
- return cls._rename_conflicted_file(path, replacement_path)
+ return cls._rename_conflicted_file(abspath_u, replacement_path_u)
@classmethod
- def _rename_conflicted_file(self, path, replacement_path):
- conflict_path = path + u".conflict"
- fileutil.rename_no_overwrite(replacement_path, conflict_path)
- return conflict_path
+ def _rename_conflicted_file(self, abspath_u, replacement_path_u):
+ conflict_path_u = abspath_u + u".conflict"
+ fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
+ return conflict_path_u