]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/drop_upload.py
Move backupdb.py to src/allmydata.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / drop_upload.py
1
2 import sys
3
4 from twisted.internet import defer
5 from twisted.python.filepath import FilePath
6 from twisted.application import service
7 from foolscap.api import eventually
8
9 from allmydata.interfaces import IDirectoryNode
10
11 from allmydata.util.fileutil import abspath_expanduser_unicode, precondition_abspath
12 from allmydata.util.encodingutil import listdir_unicode, to_filepath, \
13      unicode_from_filepath, quote_local_unicode_path, FilenameEncodingError
14 from allmydata.immutable.upload import FileName
15 from allmydata import backupdb
16
17
18 class DropUploader(service.MultiService):
19     name = 'drop-upload'
20
21     def __init__(self, client, upload_dircap, local_dir, dbfile, inotify=None,
22                  pending_delay=1.0):
23         precondition_abspath(local_dir)
24
25         service.MultiService.__init__(self)
26         self._local_dir = abspath_expanduser_unicode(local_dir)
27         self._client = client
28         self._stats_provider = client.stats_provider
29         self._convergence = client.convergence
30         self._local_path = to_filepath(self._local_dir)
31         self._dbfile = dbfile
32
33         self.is_upload_ready = False
34
35         if inotify is None:
36             if sys.platform == "win32":
37                 from allmydata.windows import inotify
38             else:
39                 from twisted.internet import inotify
40         self._inotify = inotify
41
42         if not self._local_path.exists():
43             raise AssertionError("The '[drop_upload] local.directory' parameter was %s "
44                                  "but there is no directory at that location."
45                                  % quote_local_unicode_path(local_dir))
46         if not self._local_path.isdir():
47             raise AssertionError("The '[drop_upload] local.directory' parameter was %s "
48                                  "but the thing at that location is not a directory."
49                                  % quote_local_unicode_path(local_dir))
50
51         # TODO: allow a path rather than a cap URI.
52         self._parent = self._client.create_node_from_uri(upload_dircap)
53         if not IDirectoryNode.providedBy(self._parent):
54             raise AssertionError("The URI in 'private/drop_upload_dircap' does not refer to a directory.")
55         if self._parent.is_unknown() or self._parent.is_readonly():
56             raise AssertionError("The URI in 'private/drop_upload_dircap' is not a writecap to a directory.")
57
58         self._uploaded_callback = lambda ign: None
59
60         self._notifier = inotify.INotify()
61         if hasattr(self._notifier, 'set_pending_delay'):
62             self._notifier.set_pending_delay(pending_delay)
63
64         # We don't watch for IN_CREATE, because that would cause us to read and upload a
65         # possibly-incomplete file before the application has closed it. There should always
66         # be an IN_CLOSE_WRITE after an IN_CREATE (I think).
67         # TODO: what about IN_MOVE_SELF or IN_UNMOUNT?
68         mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO | inotify.IN_ONLYDIR
69         self._notifier.watch(self._local_path, mask=mask, callbacks=[self._notify])
70
71     def _check_db_file(self, childpath):
72         # returns True if the file must be uploaded.
73         assert self._db != None
74         r = self._db.check_file(childpath)
75         filecap = r.was_uploaded()
76         if filecap is False:
77             return True
78
79     def startService(self):
80         self._db = backupdb.get_backupdb(self._dbfile)
81         if self._db is None:
82             return Failure(Exception('ERROR: Unable to load magic folder db.'))
83
84         service.MultiService.startService(self)
85         d = self._notifier.startReading()
86         self._stats_provider.count('drop_upload.dirs_monitored', 1)
87         return d
88
89     def upload_ready(self):
90         """upload_ready is used to signal us to start
91         processing the upload items...
92         """
93         self.is_upload_ready = True
94
95     def _notify(self, opaque, path, events_mask):
96         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
97
98         self._stats_provider.count('drop_upload.files_queued', 1)
99         eventually(self._process, opaque, path, events_mask)
100
101     def _process(self, opaque, path, events_mask):
102         d = defer.succeed(None)
103
104         # FIXME: if this already exists as a mutable file, we replace the directory entry,
105         # but we should probably modify the file (as the SFTP frontend does).
106         def _add_file(ign):
107             name = path.basename()
108             # on Windows the name is already Unicode
109             if not isinstance(name, unicode):
110                 name = name.decode(get_filesystem_encoding())
111
112             u = FileName(path.path, self._convergence)
113             return self._parent.add_file(name, u)
114         d.addCallback(_add_file)
115
116         def _succeeded(ign):
117             self._stats_provider.count('drop_upload.files_queued', -1)
118             self._stats_provider.count('drop_upload.files_uploaded', 1)
119         def _failed(f):
120             self._stats_provider.count('drop_upload.files_queued', -1)
121             if path.exists():
122                 self._log("drop-upload: %r failed to upload due to %r" % (path.path, f))
123                 self._stats_provider.count('drop_upload.files_failed', 1)
124                 return f
125             else:
126                 self._log("drop-upload: notified file %r disappeared "
127                           "(this is normal for temporary files): %r" % (path.path, f))
128                 self._stats_provider.count('drop_upload.files_disappeared', 1)
129                 return None
130         d.addCallbacks(_succeeded, _failed)
131         d.addBoth(self._uploaded_callback)
132         return d
133
134     def set_uploaded_callback(self, callback):
135         """This sets a function that will be called after a file has been uploaded."""
136         self._uploaded_callback = callback
137
138     def finish(self, for_tests=False):
139         self._notifier.stopReading()
140         self._stats_provider.count('drop_upload.dirs_monitored', -1)
141         if for_tests and hasattr(self._notifier, 'wait_until_stopped'):
142             return self._notifier.wait_until_stopped()
143         else:
144             return defer.succeed(None)
145
146     def _log(self, msg):
147         self._client.log(msg)
148         #open("events", "ab+").write(msg)