4 from twisted.internet import defer
5 from twisted.python.filepath import FilePath
6 from twisted.application import service
7 from foolscap.api import eventually
9 from allmydata.interfaces import IDirectoryNode
11 from allmydata.util.encodingutil import quote_output, get_filesystem_encoding
12 from allmydata.immutable.upload import FileName
15 class DropUploader(service.MultiService):
16 def __init__(self, client, upload_dircap, local_dir_utf8, inotify=None):
17 service.MultiService.__init__(self)
20 local_dir_u = os.path.expanduser(local_dir_utf8.decode('utf-8'))
21 if sys.platform == "win32":
22 local_dir = local_dir_u
24 local_dir = local_dir_u.encode(get_filesystem_encoding())
25 except (UnicodeEncodeError, UnicodeDecodeError):
26 raise AssertionError("The '[drop_upload] local.directory' parameter %s was not valid UTF-8 or "
27 "could not be represented in the filesystem encoding."
28 % quote_output(local_dir_utf8))
31 self._stats_provider = client.stats_provider
32 self._convergence = client.convergence
33 self._local_path = FilePath(local_dir)
36 from twisted.internet import inotify
37 self._inotify = inotify
39 if not self._local_path.exists():
40 raise AssertionError("The '[drop_upload] local.directory' parameter was %s but there is no directory at that location." % quote_output(local_dir_u))
41 if not self._local_path.isdir():
42 raise AssertionError("The '[drop_upload] local.directory' parameter was %s but the thing at that location is not a directory." % quote_output(local_dir_u))
44 # TODO: allow a path rather than a cap URI.
45 self._parent = self._client.create_node_from_uri(upload_dircap)
46 if not IDirectoryNode.providedBy(self._parent):
47 raise AssertionError("The '[drop_upload] upload.dircap' parameter does not refer to a directory.")
48 if self._parent.is_unknown() or self._parent.is_readonly():
49 raise AssertionError("The '[drop_upload] upload.dircap' parameter is not a writecap to a directory.")
51 self._uploaded_callback = lambda ign: None
53 self._notifier = inotify.INotify()
55 # We don't watch for IN_CREATE, because that would cause us to read and upload a
56 # possibly-incomplete file before the application has closed it. There should always
57 # be an IN_CLOSE_WRITE after an IN_CREATE (I think).
58 # TODO: what about IN_MOVE_SELF or IN_UNMOUNT?
59 mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO | inotify.IN_ONLYDIR
60 self._notifier.watch(self._local_path, mask=mask, callbacks=[self._notify])
63 d = self._notifier.startReading()
64 self._stats_provider.count('drop_upload.dirs_monitored', 1)
67 def _notify(self, opaque, path, events_mask):
68 self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
70 self._stats_provider.count('drop_upload.files_queued', 1)
71 eventually(self._process, opaque, path, events_mask)
73 def _process(self, opaque, path, events_mask):
74 d = defer.succeed(None)
76 # FIXME: if this already exists as a mutable file, we replace the directory entry,
77 # but we should probably modify the file (as the SFTP frontend does).
79 name = path.basename()
80 # on Windows the name is already Unicode
81 if not isinstance(name, unicode):
82 name = name.decode(get_filesystem_encoding())
84 u = FileName(path.path, self._convergence)
85 return self._parent.add_file(name, u)
86 d.addCallback(_add_file)
89 self._stats_provider.count('drop_upload.files_queued', -1)
90 self._stats_provider.count('drop_upload.files_uploaded', 1)
92 self._stats_provider.count('drop_upload.files_queued', -1)
94 self._log("drop-upload: %r failed to upload due to %r" % (path.path, f))
95 self._stats_provider.count('drop_upload.files_failed', 1)
98 self._log("drop-upload: notified file %r disappeared "
99 "(this is normal for temporary files): %r" % (path.path, f))
100 self._stats_provider.count('drop_upload.files_disappeared', 1)
102 d.addCallbacks(_succeeded, _failed)
103 d.addBoth(self._uploaded_callback)
106 def set_uploaded_callback(self, callback):
107 """This sets a function that will be called after a file has been uploaded."""
108 self._uploaded_callback = callback
110 def finish(self, for_tests=False):
111 self._notifier.stopReading()
112 self._stats_provider.count('drop_upload.dirs_monitored', -1)
113 if for_tests and hasattr(self._notifier, 'wait_until_stopped'):
114 return self._notifier.wait_until_stopped()
116 return defer.succeed(None)
119 self._client.log(msg)
120 #open("events", "ab+").write(msg)