]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/drop_upload.py
6cc2726bd68337d419d3944b15253fe0c0ecc4bd
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / drop_upload.py
1
2 import sys
3
4 from twisted.internet import defer
5 from twisted.python.failure import Failure
6 from twisted.python.filepath import FilePath
7 from twisted.application import service
8 from foolscap.api import eventually
9
10 from allmydata.interfaces import IDirectoryNode
11
12 from allmydata.util.encodingutil import quote_output, get_filesystem_encoding
13 from allmydata.util.fileutil import abspath_expanduser_unicode
14 from allmydata.immutable.upload import FileName
15 from allmydata.scripts import backupdb, tahoe_backup
16
17
18 class DropUploader(service.MultiService):
19     name = 'drop-upload'
20
21     def __init__(self, client, upload_dircap, local_dir_utf8, dbfile, inotify=None):
22         service.MultiService.__init__(self)
23         try:
24             local_dir_u = abspath_expanduser_unicode(local_dir_utf8.decode('utf-8'))
25             if sys.platform == "win32":
26                 local_dir = local_dir_u
27             else:
28                 local_dir = local_dir_u.encode(get_filesystem_encoding())
29         except (UnicodeEncodeError, UnicodeDecodeError):
30             raise AssertionError("The '[drop_upload] local.directory' parameter %s was not valid UTF-8 or "
31                                  "could not be represented in the filesystem encoding."
32                                  % quote_output(local_dir_utf8))
33
34         self._client = client
35         self._stats_provider = client.stats_provider
36         self._convergence = client.convergence
37         self._local_path = FilePath(local_dir)
38         self._dbfile = dbfile
39
40         if inotify is None:
41             from twisted.internet import inotify
42         self._inotify = inotify
43
44         if not self._local_path.exists():
45             raise AssertionError("The '[drop_upload] local.directory' parameter was %s but there is no directory at that location." % quote_output(local_dir_u))
46         if not self._local_path.isdir():
47             raise AssertionError("The '[drop_upload] local.directory' parameter was %s but the thing at that location is not a directory." % quote_output(local_dir_u))
48
49         # TODO: allow a path rather than a cap URI.
50         self._parent = self._client.create_node_from_uri(upload_dircap)
51         if not IDirectoryNode.providedBy(self._parent):
52             raise AssertionError("The '[drop_upload] upload.dircap' parameter does not refer to a directory.")
53         if self._parent.is_unknown() or self._parent.is_readonly():
54             raise AssertionError("The '[drop_upload] upload.dircap' parameter is not a writecap to a directory.")
55
56         self._uploaded_callback = lambda ign: None
57
58         self._notifier = inotify.INotify()
59
60         # We don't watch for IN_CREATE, because that would cause us to read and upload a
61         # possibly-incomplete file before the application has closed it. There should always
62         # be an IN_CLOSE_WRITE after an IN_CREATE (I think).
63         # TODO: what about IN_MOVE_SELF or IN_UNMOUNT?
64         mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO | inotify.IN_ONLYDIR
65         self._notifier.watch(self._local_path, mask=mask, callbacks=[self._notify])
66
67     def _check_db_file(self, childpath):
68         """_check_db_file returns True if the file must be uploaded.
69         """
70         assert self._db != None
71         use_timestamps = True
72         r = self._db.check_file(childpath, use_timestamps)
73         # XXX call r.should_check() ?
74         return !r.was_uploaded()
75
76     def _scan(self, localpath):
77         quoted_path = quote_local_unicode_path(localpath)
78
79         try:
80             children = listdir_unicode(localpath)
81         except EnvironmentError:
82             raise(Exception("WARNING: magic folder: permission denied on directory %s" % (quoted_path,)))
83         except FilenameEncodingError:
84             raise(Esception("WARNING: magic folder: could not list directory %s due to a filename encoding error" % (quoted_path,)))
85
86         for child in children:
87             assert isinstance(child, unicode), child
88             childpath = os.path.join(localpath, child)
89             # note: symlinks to directories are both islink() and isdir()
90             if os.path.isdir(childpath) and not os.path.islink(childpath):
91                 metadata = tahoe_backup.get_local_metadata(childpath)
92                 # recurse on the child directory
93                 self.process(childpath)
94             elif os.path.isfile(childpath) and not os.path.islink(childpath):
95                 try:
96                     must_upload = self.check_db_file(childpath)
97                     if must_upload:
98                         
99
100
101     def startService(self):
102         self._db = backupdb.get_backupdb(self._dbfile, stderr)
103         if not self.backupdb:
104             # XXX or raise an exception?
105             return Failure(Exception('ERROR: Unable to load magic folder db.'))
106
107         service.MultiService.startService(self)
108         d = self._notifier.startReading()
109         self._stats_provider.count('drop_upload.dirs_monitored', 1)
110         return d
111
112     def _notify(self, opaque, path, events_mask):
113         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
114
115         self._stats_provider.count('drop_upload.files_queued', 1)
116         eventually(self._process, opaque, path, events_mask)
117
118     def _process(self, opaque, path, events_mask):
119         d = defer.succeed(None)
120
121         # FIXME: if this already exists as a mutable file, we replace the directory entry,
122         # but we should probably modify the file (as the SFTP frontend does).
123         def _add_file(ign):
124             name = path.basename()
125             # on Windows the name is already Unicode
126             if not isinstance(name, unicode):
127                 name = name.decode(get_filesystem_encoding())
128
129             u = FileName(path.path, self._convergence)
130             return self._parent.add_file(name, u)
131         d.addCallback(_add_file)
132
133         def _succeeded(ign):
134             self._stats_provider.count('drop_upload.files_queued', -1)
135             self._stats_provider.count('drop_upload.files_uploaded', 1)
136         def _failed(f):
137             self._stats_provider.count('drop_upload.files_queued', -1)
138             if path.exists():
139                 self._log("drop-upload: %r failed to upload due to %r" % (path.path, f))
140                 self._stats_provider.count('drop_upload.files_failed', 1)
141                 return f
142             else:
143                 self._log("drop-upload: notified file %r disappeared "
144                           "(this is normal for temporary files): %r" % (path.path, f))
145                 self._stats_provider.count('drop_upload.files_disappeared', 1)
146                 return None
147         d.addCallbacks(_succeeded, _failed)
148         d.addBoth(self._uploaded_callback)
149         return d
150
151     def set_uploaded_callback(self, callback):
152         """This sets a function that will be called after a file has been uploaded."""
153         self._uploaded_callback = callback
154
155     def finish(self, for_tests=False):
156         self._notifier.stopReading()
157         self._stats_provider.count('drop_upload.dirs_monitored', -1)
158         if for_tests and hasattr(self._notifier, 'wait_until_stopped'):
159             return self._notifier.wait_until_stopped()
160         else:
161             return defer.succeed(None)
162
163     def _log(self, msg):
164         self._client.log(msg)
165         #open("events", "ab+").write(msg)