]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/drop_upload.py
test_client.py: add a test that the drop-uploader is initialized correctly by client...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / drop_upload.py
1
2 import os, sys
3
4 from twisted.internet import defer
5 from twisted.python.filepath import FilePath
6 from twisted.application import service
7 from foolscap.api import eventually
8
9 from allmydata.interfaces import IDirectoryNode
10
11 from allmydata.util.encodingutil import quote_output, get_filesystem_encoding
12 from allmydata.immutable.upload import FileName
13
14
15 class DropUploader(service.MultiService):
16     name = 'drop-upload'
17
18     def __init__(self, client, upload_dircap, local_dir_utf8, inotify=None):
19         service.MultiService.__init__(self)
20
21         try:
22             local_dir_u = os.path.expanduser(local_dir_utf8.decode('utf-8'))
23             if sys.platform == "win32":
24                 local_dir = local_dir_u
25             else:
26                 local_dir = local_dir_u.encode(get_filesystem_encoding())
27         except (UnicodeEncodeError, UnicodeDecodeError):
28             raise AssertionError("The '[drop_upload] local.directory' parameter %s was not valid UTF-8 or "
29                                  "could not be represented in the filesystem encoding."
30                                  % quote_output(local_dir_utf8))
31
32         self._client = client
33         self._stats_provider = client.stats_provider
34         self._convergence = client.convergence
35         self._local_path = FilePath(local_dir)
36
37         if inotify is None:
38             from twisted.internet import inotify
39         self._inotify = inotify
40
41         if not self._local_path.exists():
42             raise AssertionError("The '[drop_upload] local.directory' parameter was %s but there is no directory at that location." % quote_output(local_dir_u))
43         if not self._local_path.isdir():
44             raise AssertionError("The '[drop_upload] local.directory' parameter was %s but the thing at that location is not a directory." % quote_output(local_dir_u))
45
46         # TODO: allow a path rather than a cap URI.
47         self._parent = self._client.create_node_from_uri(upload_dircap)
48         if not IDirectoryNode.providedBy(self._parent):
49             raise AssertionError("The '[drop_upload] upload.dircap' parameter does not refer to a directory.")
50         if self._parent.is_unknown() or self._parent.is_readonly():
51             raise AssertionError("The '[drop_upload] upload.dircap' parameter is not a writecap to a directory.")
52
53         self._uploaded_callback = lambda ign: None
54
55         self._notifier = inotify.INotify()
56
57         # We don't watch for IN_CREATE, because that would cause us to read and upload a
58         # possibly-incomplete file before the application has closed it. There should always
59         # be an IN_CLOSE_WRITE after an IN_CREATE (I think).
60         # TODO: what about IN_MOVE_SELF or IN_UNMOUNT?
61         mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO | inotify.IN_ONLYDIR
62         self._notifier.watch(self._local_path, mask=mask, callbacks=[self._notify])
63
64     def startService(self):
65         service.MultiService.startService(self)
66         d = self._notifier.startReading()
67         self._stats_provider.count('drop_upload.dirs_monitored', 1)
68         return d
69
70     def _notify(self, opaque, path, events_mask):
71         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
72
73         self._stats_provider.count('drop_upload.files_queued', 1)
74         eventually(self._process, opaque, path, events_mask)
75
76     def _process(self, opaque, path, events_mask):
77         d = defer.succeed(None)
78
79         # FIXME: if this already exists as a mutable file, we replace the directory entry,
80         # but we should probably modify the file (as the SFTP frontend does).
81         def _add_file(ign):
82             name = path.basename()
83             # on Windows the name is already Unicode
84             if not isinstance(name, unicode):
85                 name = name.decode(get_filesystem_encoding())
86
87             u = FileName(path.path, self._convergence)
88             return self._parent.add_file(name, u)
89         d.addCallback(_add_file)
90
91         def _succeeded(ign):
92             self._stats_provider.count('drop_upload.files_queued', -1)
93             self._stats_provider.count('drop_upload.files_uploaded', 1)
94         def _failed(f):
95             self._stats_provider.count('drop_upload.files_queued', -1)
96             if path.exists():
97                 self._log("drop-upload: %r failed to upload due to %r" % (path.path, f))
98                 self._stats_provider.count('drop_upload.files_failed', 1)
99                 return f
100             else:
101                 self._log("drop-upload: notified file %r disappeared "
102                           "(this is normal for temporary files): %r" % (path.path, f))
103                 self._stats_provider.count('drop_upload.files_disappeared', 1)
104                 return None
105         d.addCallbacks(_succeeded, _failed)
106         d.addBoth(self._uploaded_callback)
107         return d
108
109     def set_uploaded_callback(self, callback):
110         """This sets a function that will be called after a file has been uploaded."""
111         self._uploaded_callback = callback
112
113     def finish(self, for_tests=False):
114         self._notifier.stopReading()
115         self._stats_provider.count('drop_upload.dirs_monitored', -1)
116         if for_tests and hasattr(self._notifier, 'wait_until_stopped'):
117             return self._notifier.wait_until_stopped()
118         else:
119             return defer.succeed(None)
120
121     def _log(self, msg):
122         self._client.log(msg)
123         #open("events", "ab+").write(msg)