4 from twisted.internet import defer
5 from twisted.python.filepath import FilePath
6 from twisted.application import service
7 from foolscap.api import eventually
9 from allmydata.interfaces import IDirectoryNode
11 from allmydata.util.fileutil import abspath_expanduser_unicode, precondition_abspath
12 from allmydata.util.encodingutil import listdir_unicode, to_filepath, \
13 unicode_from_filepath, quote_local_unicode_path, FilenameEncodingError
14 from allmydata.immutable.upload import FileName
15 from allmydata import backupdb
19 class MagicFolder(service.MultiService):
22 def __init__(self, client, upload_dircap, local_dir, dbfile, inotify=None,
24 precondition_abspath(local_dir)
26 service.MultiService.__init__(self)
27 self._local_dir = abspath_expanduser_unicode(local_dir)
29 self._stats_provider = client.stats_provider
30 self._convergence = client.convergence
31 self._local_path = to_filepath(self._local_dir)
34 self.is_upload_ready = False
37 if sys.platform == "win32":
38 from allmydata.windows import inotify
40 from twisted.internet import inotify
41 self._inotify = inotify
43 if not self._local_path.exists():
44 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
45 "but there is no directory at that location."
46 % quote_local_unicode_path(local_dir))
47 if not self._local_path.isdir():
48 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
49 "but the thing at that location is not a directory."
50 % quote_local_unicode_path(local_dir))
52 # TODO: allow a path rather than a cap URI.
53 self._parent = self._client.create_node_from_uri(upload_dircap)
54 if not IDirectoryNode.providedBy(self._parent):
55 raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
56 if self._parent.is_unknown() or self._parent.is_readonly():
57 raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
59 self._uploaded_callback = lambda ign: None
61 self._notifier = inotify.INotify()
62 if hasattr(self._notifier, 'set_pending_delay'):
63 self._notifier.set_pending_delay(pending_delay)
65 # We don't watch for IN_CREATE, because that would cause us to read and upload a
66 # possibly-incomplete file before the application has closed it. There should always
67 # be an IN_CLOSE_WRITE after an IN_CREATE (I think).
68 # TODO: what about IN_MOVE_SELF or IN_UNMOUNT?
69 mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO | inotify.IN_ONLYDIR
70 self._notifier.watch(self._local_path, mask=mask, callbacks=[self._notify])
72 def _check_db_file(self, childpath):
73 # returns True if the file must be uploaded.
74 assert self._db != None
75 r = self._db.check_file(childpath)
76 filecap = r.was_uploaded()
80 def startService(self):
81 self._db = backupdb.get_backupdb(self._dbfile)
83 return Failure(Exception('ERROR: Unable to load magic folder db.'))
85 service.MultiService.startService(self)
86 d = self._notifier.startReading()
87 self._stats_provider.count('drop_upload.dirs_monitored', 1)
90 def upload_ready(self):
91 """upload_ready is used to signal us to start
92 processing the upload items...
94 self.is_upload_ready = True
96 def _notify(self, opaque, path, events_mask):
97 self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
99 self._stats_provider.count('drop_upload.files_queued', 1)
100 eventually(self._process, opaque, path, events_mask)
102 def _process(self, opaque, path, events_mask):
103 d = defer.succeed(None)
105 # FIXME: if this already exists as a mutable file, we replace the directory entry,
106 # but we should probably modify the file (as the SFTP frontend does).
108 name = path.basename()
109 # on Windows the name is already Unicode
110 if not isinstance(name, unicode):
111 name = name.decode(get_filesystem_encoding())
113 u = FileName(path.path, self._convergence)
114 return self._parent.add_file(name, u)
115 d.addCallback(_add_file)
118 self._stats_provider.count('drop_upload.files_queued', -1)
119 self._stats_provider.count('drop_upload.files_uploaded', 1)
121 self._stats_provider.count('drop_upload.files_queued', -1)
123 self._log("drop-upload: %r failed to upload due to %r" % (path.path, f))
124 self._stats_provider.count('drop_upload.files_failed', 1)
127 self._log("drop-upload: notified file %r disappeared "
128 "(this is normal for temporary files): %r" % (path.path, f))
129 self._stats_provider.count('drop_upload.files_disappeared', 1)
131 d.addCallbacks(_succeeded, _failed)
132 d.addBoth(self._uploaded_callback)
135 def set_uploaded_callback(self, callback):
136 """This sets a function that will be called after a file has been uploaded."""
137 self._uploaded_callback = callback
139 def finish(self, for_tests=False):
140 self._notifier.stopReading()
141 self._stats_provider.count('drop_upload.dirs_monitored', -1)
142 if for_tests and hasattr(self._notifier, 'wait_until_stopped'):
143 return self._notifier.wait_until_stopped()
145 return defer.succeed(None)
148 self._client.log(msg)
149 #open("events", "ab+").write(msg)