]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
ecd3117c242f833f14889c79977cd0a45ad3b04c
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os, stat
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_unicode, to_filepath, \
19      unicode_from_filepath, quote_local_unicode_path, FilenameEncodingError
20 from allmydata.immutable.upload import FileName, Data
21 from allmydata import backupdb, magicpath
22
23
24 IN_EXCL_UNLINK = 0x04000000L
25
26 def get_inotify_module():
27     try:
28         if sys.platform == "win32":
29             from allmydata.windows import inotify
30         elif runtime.platform.supportsINotify():
31             from twisted.internet import inotify
32         else:
33             raise NotImplementedError("filesystem notification needed for drop-upload is not supported.\n"
34                                       "This currently requires Linux or Windows.")
35         return inotify
36     except (ImportError, AttributeError) as e:
37         log.msg(e)
38         if sys.platform == "win32":
39             raise NotImplementedError("filesystem notification needed for drop-upload is not supported.\n"
40                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
41         raise
42
43
44 class MagicFolder(service.MultiService):
45     name = 'magic-folder'
46
47     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, inotify=None,
48                  pending_delay=1.0):
49         precondition_abspath(local_path_u)
50
51         service.MultiService.__init__(self)
52
53         db = backupdb.get_backupdb(dbfile, create_version=(backupdb.SCHEMA_v3, 3))
54         if db is None:
55             return Failure(Exception('ERROR: Unable to load magic folder db.'))
56
57         # for tests
58         self._client = client
59         self._db = db
60
61         self.is_ready = False
62
63         self.uploader = Uploader(client, local_path_u, db, upload_dircap, inotify, pending_delay)
64         self.downloader = Downloader(client, local_path_u, db, collective_dircap)
65
66     def startService(self):
67         service.MultiService.startService(self)
68         return self.uploader.start_monitoring()
69
70     def ready(self):
71         """ready is used to signal us to start
72         processing the upload and download items...
73         """
74         self.is_ready = True
75         d = self.uploader.start_scanning()
76         d2 = self.downloader.start_scanning()
77         d.addCallback(lambda ign: d2)
78         return d
79
80     def finish(self):
81         #print "finish"
82         d = self.uploader.stop()
83         d2 = self.downloader.stop()
84         d.addCallback(lambda ign: d2)
85         return d
86
87     def remove_service(self):
88         return service.MultiService.disownServiceParent(self)
89
90
91 class QueueMixin(HookMixin):
92     def __init__(self, client, local_path_u, db, name):
93         self._client = client
94         self._local_path_u = local_path_u
95         self._local_path = to_filepath(local_path_u)
96         self._db = db
97         self._name = name
98         self._hooks = {'processed': None}
99
100         if not self._local_path.exists():
101             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
102                                  "but there is no directory at that location."
103                                  % quote_local_unicode_path(self._local_path_u))
104         if not self._local_path.isdir():
105             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
106                                  "but the thing at that location is not a directory."
107                                  % quote_local_unicode_path(self._local_path_u))
108
109         self._deque = deque()
110         self._lazy_tail = defer.succeed(None)
111         self._pending = set()
112         self._stopped = False
113         self._turn_delay = 0
114
115     def _count(self, counter_name, delta=1):
116         self._client.stats_provider.count('magic_folder.%s.%s' % (self._name, counter_name), delta)
117
118     def _log(self, msg):
119         s = "Magic Folder %s: %s" % (self._name, msg)
120         self._client.log(s)
121         print s
122         #open("events", "ab+").write(msg)
123
124     def _append_to_deque(self, path):
125         if path in self._pending:
126             return
127         self._deque.append(path)
128         self._pending.add(path)
129         self._count('objects_queued')
130         if self.is_ready:
131             reactor.callLater(0, self._turn_deque)
132
133     def _turn_deque(self):
134         if self._stopped:
135             return
136         try:
137             item = self._deque.pop()
138         except IndexError:
139             self._log("deque is now empty")
140             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
141         else:
142             self._lazy_tail.addCallback(lambda ign: self._process(item))
143             self._lazy_tail.addBoth(self._call_hook, 'processed')
144             self._lazy_tail.addErrback(log.err)
145             self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._turn_delay, self._turn_deque))
146
147
148 class Uploader(QueueMixin):
149     def __init__(self, client, local_path_u, db, upload_dircap, inotify, pending_delay):
150         QueueMixin.__init__(self, client, local_path_u, db, 'uploader')
151
152         self.is_ready = False
153
154         # TODO: allow a path rather than a cap URI.
155         self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
156         if not IDirectoryNode.providedBy(self._upload_dirnode):
157             raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
158         if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
159             raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
160
161         self._inotify = inotify or get_inotify_module()
162         self._notifier = self._inotify.INotify()
163
164         if hasattr(self._notifier, 'set_pending_delay'):
165             self._notifier.set_pending_delay(pending_delay)
166
167         # We don't watch for IN_CREATE, because that would cause us to read and upload a
168         # possibly-incomplete file before the application has closed it. There should always
169         # be an IN_CLOSE_WRITE after an IN_CREATE (I think).
170         # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
171         #
172         self.mask = ( self._inotify.IN_CLOSE_WRITE
173                     | self._inotify.IN_MOVED_TO
174                     | self._inotify.IN_MOVED_FROM
175                     | self._inotify.IN_DELETE
176                     | self._inotify.IN_ONLYDIR
177                     | IN_EXCL_UNLINK
178                     )
179         self._notifier.watch(self._local_path, mask=self.mask, callbacks=[self._notify],
180                              recursive=True)
181
182     def start_monitoring(self):
183         d = self._notifier.startReading()
184         self._count('dirs_monitored')
185         return d
186
187     def stop(self):
188         self._notifier.stopReading()
189         self._count('dirs_monitored', -1)
190         if hasattr(self._notifier, 'wait_until_stopped'):
191             d = self._notifier.wait_until_stopped()
192         else:
193             d = defer.succeed(None)
194         d.addCallback(lambda ign: self._lazy_tail)
195         return d
196
197     def start_scanning(self):
198         self.is_ready = True
199         d = self._scan(self._local_path_u) # XXX do not want dropped deferreds!
200         self._turn_deque()
201         return d
202
203     def _scan(self, local_path_u):  # XXX should this take a FilePath?
204         if not os.path.isdir(local_path_u):
205             raise AssertionError("Programmer error: _scan() must be passed a directory path.")
206         quoted_path = quote_local_unicode_path(local_path_u)
207         try:
208             children = listdir_unicode(local_path_u)
209         except EnvironmentError:
210             raise(Exception("WARNING: magic folder: permission denied on directory %s" % (quoted_path,)))
211         except FilenameEncodingError:
212             raise(Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error" % (quoted_path,)))
213
214         d = defer.succeed(None)
215         for child in children:
216             assert isinstance(child, unicode), child
217             d.addCallback(lambda ign, child=child: os.path.join(local_path_u, child))
218             d.addCallback(self._process_child)
219             d.addErrback(log.err)
220
221         return d
222
223     def _notify(self, opaque, path, events_mask):
224         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
225         path_u = unicode_from_filepath(path)
226         self._append_to_deque(path_u)
227
228     def _when_queue_is_empty(self):
229         return defer.succeed(None)
230
231     def _process_child(self, path_u):
232         precondition(isinstance(path_u, unicode), path_u)
233
234         pathinfo = get_pathinfo(path_u)
235
236         if pathinfo.islink:
237             self.warn("WARNING: cannot backup symlink %s" % quote_local_unicode_path(path_u))
238             return None
239         elif pathinfo.isdir:
240             # process directories unconditionally
241             self._append_to_deque(path_u)
242
243             # recurse on the child directory
244             return self._scan(path_u)
245         elif pathinfo.isfile:
246             file_version = self._db.get_local_file_version(path_u)
247             if file_version is None:
248                 # XXX upload if we didn't record our version in magicfolder db?
249                 self._append_to_deque(path_u)
250                 return None
251             else:
252                 d2 = self._get_collective_latest_file(path_u)
253                 def _got_latest_file((file_node, metadata)):
254                     collective_version = metadata['version']
255                     if collective_version is None:
256                         return None
257                     if file_version > collective_version:
258                         self._append_to_upload_deque(path_u)
259                     elif file_version < collective_version: # FIXME Daira thinks this is wrong
260                         # if a collective version of the file is newer than ours
261                         # we must download it and unlink the old file from our upload dirnode
262                         self._append_to_download_deque(path_u)
263                         # XXX where should we save the returned deferred?
264                         return self._upload_dirnode.delete(path_u, must_be_file=True)
265                     else:
266                         # XXX same version. do nothing.
267                         pass
268                 d2.addCallback(_got_latest_file)
269                 return d2
270         else:
271             self.warn("WARNING: cannot backup special file %s" % quote_local_unicode_path(path_u))
272             return None
273
274     def _process(self, path_u):
275         precondition(isinstance(path_u, unicode), path_u)
276
277         d = defer.succeed(None)
278
279         def _maybe_upload(val):
280             pathinfo = get_pathinfo(path_u)
281
282             self._pending.remove(path_u)  # FIXME make _upload_pending hold relative paths
283             relpath_u = os.path.relpath(path_u, self._local_path_u)
284             encoded_name_u = magicpath.path2magic(relpath_u)
285
286             if not pathinfo.exists:
287                 self._log("drop-upload: notified object %r disappeared "
288                           "(this is normal for temporary objects)" % (path_u,))
289                 self._count('objects_disappeared')
290                 d2 = defer.succeed(None)
291                 if self._db.check_file_db_exists(relpath_u):
292                     d2.addCallback(lambda ign: self._get_metadata(encoded_name_u))
293                     current_version = self._db.get_local_file_version(relpath_u) + 1
294                     def set_deleted(metadata):
295                         metadata['version'] = current_version
296                         metadata['deleted'] = True
297                         empty_uploadable = Data("", self._client.convergence)
298                         return self._upload_dirnode.add_file(encoded_name_u, empty_uploadable, overwrite=True, metadata=metadata)
299                     d2.addCallback(set_deleted)
300                     def add_db_entry(filenode):
301                         filecap = filenode.get_uri()
302                         size = 0
303                         now = time.time()
304                         ctime = now
305                         mtime = now
306                         self._db.did_upload_file(filecap, relpath_u, current_version, int(mtime), int(ctime), size)
307                         self._count('files_uploaded')
308                     d2.addCallback(lambda x: self._get_filenode(encoded_name_u))
309                     d2.addCallback(add_db_entry)
310
311                 d2.addCallback(lambda x: Exception("file does not exist"))  # FIXME wrong
312                 return d2
313             elif pathinfo.islink:
314                 self.warn("WARNING: cannot upload symlink %s" % quote_local_unicode_path(path_u))
315                 return None
316             elif pathinfo.isdir:
317                 self._notifier.watch(to_filepath(path_u), mask=self.mask, callbacks=[self._notify], recursive=True)
318                 uploadable = Data("", self._client.convergence)
319                 encoded_name_u += u"@_"
320                 upload_d = self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":0}, overwrite=True)
321                 def _succeeded(ign):
322                     self._log("created subdirectory %r" % (path_u,))
323                     self._count('directories_created')
324                 def _failed(f):
325                     self._log("failed to create subdirectory %r" % (path_u,))
326                     return f
327                 upload_d.addCallbacks(_succeeded, _failed)
328                 upload_d.addCallback(lambda ign: self._scan(path_u))
329                 return upload_d
330             elif pathinfo.isfile:
331                 version = self._db.get_local_file_version(relpath_u)
332                 if version is None:
333                     version = 0
334                 else:
335                     version += 1
336
337                 uploadable = FileName(path_u, self._client.convergence)
338                 d2 = self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":version}, overwrite=True)
339                 def add_db_entry(filenode):
340                     filecap = filenode.get_uri()
341                     # XXX maybe just pass pathinfo
342                     self._db.did_upload_file(filecap, relpath_u, version,
343                                              pathinfo.mtime, pathinfo.ctime, pathinfo.size)
344                     self._count('files_uploaded')
345                 d2.addCallback(add_db_entry)
346                 return d2
347             else:
348                 self.warn("WARNING: cannot process special file %s" % quote_local_unicode_path(path_u))
349                 return None
350
351         d.addCallback(_maybe_upload)
352
353         def _succeeded(res):
354             self._count('objects_queued', -1)
355             self._count('objects_succeeded')
356             return res
357         def _failed(f):
358             self._count('objects_queued', -1)
359             self._count('objects_failed')
360             self._log("%r while processing %r" % (f, path_u))
361             return f
362         d.addCallbacks(_succeeded, _failed)
363         return d
364
365     def _get_metadata(self, encoded_name_u):
366         try:
367             d = self._upload_dirnode.get_metadata_for(encoded_name_u)
368         except KeyError:
369             return Failure()
370         return d
371
372     def _get_filenode(self, encoded_name_u):
373         try:
374             d = self._upload_dirnode.get(encoded_name_u)
375         except KeyError:
376             return Failure()
377         return d
378
379
380 class Downloader(QueueMixin):
381     def __init__(self, client, local_path_u, db, collective_dircap):
382         QueueMixin.__init__(self, client, local_path_u, db, 'downloader')
383
384         # TODO: allow a path rather than a cap URI.
385         self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
386
387         if not IDirectoryNode.providedBy(self._collective_dirnode):
388             raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
389         if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
390             raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
391
392         self._turn_delay = 3 # delay between remote scans
393         self._download_scan_batch = {} # path -> [(filenode, metadata)]
394
395     def start_scanning(self):
396         self._log("\nstart_scanning")
397         files = self._db.get_all_files()
398         self._log("all files %s" % files)
399
400         d = self._scan_remote_collective()
401         self._turn_deque()
402         return d
403
404     def stop(self):
405         self._stopped = True
406         d = defer.succeed(None)
407         d.addCallback(lambda ign: self._lazy_tail)
408         return d
409
410     def _should_download(self, relpath_u, remote_version):
411         """
412         _should_download returns a bool indicating whether or not a remote object should be downloaded.
413         We check the remote metadata version against our magic-folder db version number;
414         latest version wins.
415         """
416         v = self._db.get_local_file_version(relpath_u)
417         return (v is None or v < remote_version)
418
419     def _get_local_latest(self, path_u):
420         """_get_local_latest takes a unicode path string checks to see if this file object
421         exists in our magic-folder db; if not then return None
422         else check for an entry in our magic-folder db and return the version number.
423         """
424         if not os.path.exists(path_u):
425             return None
426         return self._db.get_local_file_version(path_u)
427
428     def _get_collective_latest_file(self, filename):
429         """_get_collective_latest_file takes a file path pointing to a file managed by
430         magic-folder and returns a deferred that fires with the two tuple containing a
431         file node and metadata for the latest version of the file located in the
432         magic-folder collective directory.
433         """
434         collective_dirmap_d = self._collective_dirnode.list()
435         def scan_collective(result):
436             list_of_deferreds = []
437             for dir_name in result.keys():
438                 # XXX make sure it's a directory
439                 d = defer.succeed(None)
440                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
441                 list_of_deferreds.append(d)
442             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
443             return deferList
444         collective_dirmap_d.addCallback(scan_collective)
445         def highest_version(deferredList):
446             max_version = 0
447             metadata = None
448             node = None
449             for success, result in deferredList:
450                 if success:
451                     if result[1]['version'] > max_version:
452                         node, metadata = result
453                         max_version = result[1]['version']
454             return node, metadata
455         collective_dirmap_d.addCallback(highest_version)
456         return collective_dirmap_d
457
458     def _append_to_batch(self, name, file_node, metadata):
459         if self._download_scan_batch.has_key(name):
460             self._download_scan_batch[name] += [(file_node, metadata)]
461         else:
462             self._download_scan_batch[name] = [(file_node, metadata)]
463
464     def _scan_remote(self, nickname, dirnode):
465         self._log("_scan_remote nickname %s" % nickname)
466         d = dirnode.list()
467         def scan_listing(listing_map):
468             for name in listing_map.keys():
469                 file_node, metadata = listing_map[name]
470                 local_version = self._get_local_latest(name)
471                 remote_version = metadata.get('version', None)
472                 self._log("%r has local version %r, remote version %r" % (name, local_version, remote_version))
473                 if local_version is None or remote_version is None or local_version < remote_version:
474                     self._log("added to download queue\n")
475                     self._append_to_batch(name, file_node, metadata)
476         d.addCallback(scan_listing)
477         return d
478
479     def _scan_remote_collective(self):
480         self._log("_scan_remote_collective")
481         self._download_scan_batch = {} # XXX
482
483         if self._collective_dirnode is None:
484             return
485         collective_dirmap_d = self._collective_dirnode.list()
486         def do_list(result):
487             others = [x for x in result.keys()]
488             return result, others
489         collective_dirmap_d.addCallback(do_list)
490         def scan_collective(result):
491             d = defer.succeed(None)
492             collective_dirmap, others_list = result
493             for dir_name in others_list:
494                 d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
495                 # XXX todo add errback
496             return d
497         collective_dirmap_d.addCallback(scan_collective)
498         collective_dirmap_d.addCallback(self._filter_scan_batch)
499         collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
500         return collective_dirmap_d
501
502     def _add_batch_to_download_queue(self, result):
503         self._deque.extend(result)
504         self._pending.update(map(lambda x: x[0], result))
505
506     def _filter_scan_batch(self, result):
507         extension = [] # consider whether this should be a dict
508         for name in self._download_scan_batch.keys():
509             if name in self._pending:
510                 continue
511             file_node, metadata = max(self._download_scan_batch[name], key=lambda x: x[1]['version'])
512             if self._should_download(name, metadata['version']):
513                 extension += [(name, file_node, metadata)]
514         return extension
515
516     def _when_queue_is_empty(self):
517         d = task.deferLater(reactor, self._turn_delay, self._scan_remote_collective)
518         d.addCallback(lambda ign: self._turn_deque())
519         return d
520
521     def _process(self, item):
522         (name, file_node, metadata) = item
523         d = file_node.download_best_version()
524         def succeeded(res):
525             d2 = defer.succeed(res)
526             absname = abspath_expanduser_unicode(name, base=self._local_path_u)
527             d2.addCallback(lambda result: self._write_downloaded_file(absname, result, is_conflict=False))
528             def do_update_db(full_path):
529                 filecap = file_node.get_uri()
530                 try:
531                     s = os.stat(full_path)
532                 except:
533                     raise(Exception("wtf downloaded file %s disappeared" % full_path))
534                 size = s[stat.ST_SIZE]
535                 ctime = s[stat.ST_CTIME]
536                 mtime = s[stat.ST_MTIME]
537                 self._db.did_upload_file(filecap, name, metadata['version'], mtime, ctime, size)
538             d2.addCallback(do_update_db)
539             # XXX handle failure here with addErrback...
540             self._count('objects_downloaded')
541             return d2
542         def failed(f):
543             self._log("download failed: %s" % (str(f),))
544             self._count('objects_download_failed')
545             return f
546         d.addCallbacks(succeeded, failed)
547         def remove_from_pending(res):
548             self._pending.remove(name)
549             return res
550         d.addBoth(remove_from_pending)
551         return d
552
553     FUDGE_SECONDS = 10.0
554
555     @classmethod
556     def _write_downloaded_file(cls, path, file_contents, base, is_conflict=False, now=None):
557         # 1. Write a temporary file, say .foo.tmp.
558         # 2. is_conflict determines whether this is an overwrite or a conflict.
559         # 3. Set the mtime of the replacement file to be T seconds before the
560         #    current local time.
561         # 4. Perform a file replacement with backup filename foo.backup,
562         #    replaced file foo, and replacement file .foo.tmp. If any step of
563         #    this operation fails, reclassify as a conflict and stop.
564         #
565         # Returns the path of the destination file.
566
567         precondition(isinstance(path, unicode), path=path)
568         path = fileutil.abspath_expanduser_unicode(path, base=base)
569         replacement_path = path + u".tmp"  # FIXME more unique
570         backup_path = path + u".backup"
571         if now is None:
572             now = time.time()
573
574         fileutil.write(replacement_path, file_contents)
575         os.utime(replacement_path, (now, now - cls.FUDGE_SECONDS))
576         if is_conflict:
577             return cls._rename_conflicted_file(path, replacement_path)
578         else:
579             try:
580                 fileutil.replace_file(path, replacement_path, backup_path)
581                 return path
582             except fileutil.ConflictError:
583                 return cls._rename_conflicted_file(path, replacement_path)
584
585     @classmethod
586     def _rename_conflicted_file(self, path, replacement_path):
587         conflict_path = path + u".conflict"
588         fileutil.rename_no_overwrite(replacement_path, conflict_path)
589         return conflict_path