]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
WIP.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import backupdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for drop-upload is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for drop-upload is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 class MagicFolder(service.MultiService):
46     name = 'magic-folder'
47
48     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49                  pending_delay=1.0, clock=reactor):
50         precondition_abspath(local_path_u)
51
52         service.MultiService.__init__(self)
53
54         db = backupdb.get_backupdb(dbfile, create_version=(backupdb.SCHEMA_v3, 3))
55         if db is None:
56             return Failure(Exception('ERROR: Unable to load magic folder db.'))
57
58         # for tests
59         self._client = client
60         self._db = db
61
62         self.is_ready = False
63
64         self.uploader = Uploader(client, local_path_u, db, upload_dircap, pending_delay, clock)
65         self.downloader = Downloader(client, local_path_u, db, collective_dircap, clock)
66
67     def startService(self):
68         # TODO: why is this being called more than once?
69         if self.running:
70             return defer.succeed(None)
71         print "%r.startService" % (self,)
72         service.MultiService.startService(self)
73         return self.uploader.start_monitoring()
74
75     def ready(self):
76         """ready is used to signal us to start
77         processing the upload and download items...
78         """
79         self.is_ready = True
80         d = self.uploader.start_scanning()
81         d2 = self.downloader.start_scanning()
82         d.addCallback(lambda ign: d2)
83         return d
84
85     def finish(self):
86         print "finish"
87         d = self.uploader.stop()
88         d2 = self.downloader.stop()
89         d.addCallback(lambda ign: d2)
90         return d
91
92     def remove_service(self):
93         return service.MultiService.disownServiceParent(self)
94
95
96 class QueueMixin(HookMixin):
97     def __init__(self, client, local_path_u, db, name, clock):
98         self._client = client
99         self._local_path_u = local_path_u
100         self._local_filepath = to_filepath(local_path_u)
101         self._db = db
102         self._name = name
103         self._clock = clock
104         self._hooks = {'processed': None, 'started': None}
105         self.started_d = self.set_hook('started')
106
107         if not self._local_filepath.exists():
108             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
109                                  "but there is no directory at that location."
110                                  % quote_local_unicode_path(self._local_path_u))
111         if not self._local_filepath.isdir():
112             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
113                                  "but the thing at that location is not a directory."
114                                  % quote_local_unicode_path(self._local_path_u))
115
116         self._deque = deque()
117         self._lazy_tail = defer.succeed(None)
118         self._pending = set()
119         self._stopped = False
120         self._turn_delay = 0
121
122     def _get_filepath(self, relpath_u):
123         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
124
125     def _get_relpath(self, filepath):
126         print "_get_relpath(%r)" % (filepath,)
127         segments = unicode_segments_from(filepath, self._local_filepath)
128         print "segments = %r" % (segments,)
129         return u"/".join(segments)
130
131     def _count(self, counter_name, delta=1):
132         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
133         print "%r += %r" % (ctr, delta)
134         self._client.stats_provider.count(ctr, delta)
135
136     def _log(self, msg):
137         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
138         self._client.log(s)
139         print s
140         #open("events", "ab+").write(msg)
141
142     def _append_to_deque(self, relpath_u):
143         print "_append_to_deque(%r)" % (relpath_u,)
144         if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
145             return
146         self._deque.append(relpath_u)
147         self._pending.add(relpath_u)
148         self._count('objects_queued')
149         if self.is_ready:
150             self._clock.callLater(0, self._turn_deque)
151
152     def _turn_deque(self):
153         if self._stopped:
154             return
155         try:
156             item = self._deque.pop()
157             self._count('objects_queued', -1)
158         except IndexError:
159             self._log("deque is now empty")
160             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
161         else:
162             self._lazy_tail.addCallback(lambda ign: self._process(item))
163             self._lazy_tail.addBoth(self._call_hook, 'processed')
164             self._lazy_tail.addErrback(log.err)
165             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
166
167
168 class Uploader(QueueMixin):
169     def __init__(self, client, local_path_u, db, upload_dircap, pending_delay, clock):
170         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
171
172         self.is_ready = False
173
174         # TODO: allow a path rather than a cap URI.
175         self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
176         if not IDirectoryNode.providedBy(self._upload_dirnode):
177             raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
178         if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
179             raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
180
181         self._inotify = get_inotify_module()
182         self._notifier = self._inotify.INotify()
183
184         if hasattr(self._notifier, 'set_pending_delay'):
185             self._notifier.set_pending_delay(pending_delay)
186
187         # We don't watch for IN_CREATE, because that would cause us to read and upload a
188         # possibly-incomplete file before the application has closed it. There should always
189         # be an IN_CLOSE_WRITE after an IN_CREATE (I think).
190         # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
191         #
192         self.mask = ( self._inotify.IN_CLOSE_WRITE
193                     | self._inotify.IN_MOVED_TO
194                     | self._inotify.IN_MOVED_FROM
195                     | self._inotify.IN_DELETE
196                     | self._inotify.IN_ONLYDIR
197                     | IN_EXCL_UNLINK
198                     )
199         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
200                              recursive=True)
201
202     def start_monitoring(self):
203         self._log("start_monitoring")
204         d = defer.succeed(None)
205         d.addCallback(lambda ign: self._notifier.startReading())
206         d.addCallback(lambda ign: self._count('dirs_monitored'))
207         d.addBoth(self._call_hook, 'started')
208         return d
209
210     def stop(self):
211         self._log("stop")
212         self._notifier.stopReading()
213         self._count('dirs_monitored', -1)
214         if hasattr(self._notifier, 'wait_until_stopped'):
215             d = self._notifier.wait_until_stopped()
216         else:
217             d = defer.succeed(None)
218         d.addCallback(lambda ign: self._lazy_tail)
219         return d
220
221     def start_scanning(self):
222         self._log("start_scanning")
223         self.is_ready = True
224         self._pending = self._db.get_all_relpaths()
225         print "all_files %r" % (self._pending)
226         d = self._scan(u"")
227         def _add_pending(ign):
228             # This adds all of the files that were in the db but not already processed
229             # (normally because they have been deleted on disk).
230             print "adding %r" % (self._pending)
231             self._deque.extend(self._pending)
232         d.addCallback(_add_pending)
233         d.addCallback(lambda ign: self._turn_deque())
234         return d
235
236     def _scan(self, reldir_u):
237         self._log("scan %r" % (reldir_u,))
238         fp = self._get_filepath(reldir_u)
239         try:
240             children = listdir_filepath(fp)
241         except EnvironmentError:
242             raise Exception("WARNING: magic folder: permission denied on directory %s"
243                             % quote_filepath(fp))
244         except FilenameEncodingError:
245             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
246                             % quote_filepath(fp))
247
248         d = defer.succeed(None)
249         for child in children:
250             assert isinstance(child, unicode), child
251             d.addCallback(lambda ign, child=child:
252                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
253             def _add_pending(relpath_u):
254                 if magicpath.should_ignore_file(relpath_u):
255                     return None
256
257                 self._pending.add(relpath_u)
258                 return relpath_u
259             d.addCallback(_add_pending)
260             # This call to _process doesn't go through the deque, and probably should.
261             d.addCallback(self._process)
262             d.addBoth(self._call_hook, 'processed')
263             d.addErrback(log.err)
264
265         return d
266
267     def _notify(self, opaque, path, events_mask):
268         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
269         relpath_u = self._get_relpath(path)
270         self._append_to_deque(relpath_u)
271
272     def _when_queue_is_empty(self):
273         return defer.succeed(None)
274
275     def _process(self, relpath_u):
276         self._log("_process(%r)" % (relpath_u,))
277         if relpath_u is None:
278             return
279         precondition(isinstance(relpath_u, unicode), relpath_u)
280
281         d = defer.succeed(None)
282
283         def _maybe_upload(val):
284             fp = self._get_filepath(relpath_u)
285             pathinfo = get_pathinfo(unicode_from_filepath(fp))
286
287             print "pending = %r, about to remove %r" % (self._pending, relpath_u)
288             self._pending.remove(relpath_u)
289             encoded_path_u = magicpath.path2magic(relpath_u)
290
291             if not pathinfo.exists:
292                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
293                 self._count('objects_disappeared')
294                 d2 = defer.succeed(None)
295                 if self._db.check_file_db_exists(relpath_u):
296                     d2.addCallback(lambda ign: self._get_metadata(encoded_path_u))
297                     current_version = self._db.get_local_file_version(relpath_u) + 1
298                     def set_deleted(metadata):
299                         metadata['version'] = current_version
300                         metadata['deleted'] = True
301                         empty_uploadable = Data("", self._client.convergence)
302                         return self._upload_dirnode.add_file(encoded_path_u, empty_uploadable, overwrite=True, metadata=metadata)
303                     d2.addCallback(set_deleted)
304                     def add_db_entry(filenode):
305                         filecap = filenode.get_uri()
306                         self._db.did_upload_version(filecap, relpath_u, current_version, pathinfo)
307                         self._count('files_uploaded')
308                     # FIXME consider whether it's correct to retrieve the filenode again.
309                     d2.addCallback(lambda x: self._get_filenode(encoded_path_u))
310                     d2.addCallback(add_db_entry)
311
312                 d2.addCallback(lambda x: Exception("file does not exist"))  # FIXME wrong
313                 return d2
314             elif pathinfo.islink:
315                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
316                 return None
317             elif pathinfo.isdir:
318                 self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
319                 uploadable = Data("", self._client.convergence)
320                 encoded_path_u += magicpath.path2magic(u"/")
321                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
322                 def _succeeded(ign):
323                     self._log("created subdirectory %r" % (relpath_u,))
324                     self._count('directories_created')
325                 def _failed(f):
326                     self._log("failed to create subdirectory %r" % (relpath_u,))
327                     return f
328                 upload_d.addCallbacks(_succeeded, _failed)
329                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
330                 return upload_d
331             elif pathinfo.isfile:
332                 version = self._db.get_local_file_version(relpath_u)
333                 if version is None:
334                     version = 0
335                 elif self._db.is_new_file(pathinfo, relpath_u):
336                     version += 1
337                 else:
338                     return None
339
340                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
341                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":version}, overwrite=True)
342                 def add_db_entry(filenode):
343                     filecap = filenode.get_uri()
344                     self._db.did_upload_version(filecap, relpath_u, version, pathinfo)
345                     self._count('files_uploaded')
346                 d2.addCallback(add_db_entry)
347                 return d2
348             else:
349                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
350                 return None
351
352         d.addCallback(_maybe_upload)
353
354         def _succeeded(res):
355             self._count('objects_succeeded')
356             return res
357         def _failed(f):
358             print f
359             self._count('objects_failed')
360             self._log("%r while processing %r" % (f, relpath_u))
361             return f
362         d.addCallbacks(_succeeded, _failed)
363         return d
364
365     def _get_metadata(self, encoded_path_u):
366         try:
367             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
368         except KeyError:
369             return Failure()
370         return d
371
372     def _get_filenode(self, encoded_path_u):
373         try:
374             d = self._upload_dirnode.get(encoded_path_u)
375         except KeyError:
376             return Failure()
377         return d
378
379
380 class Downloader(QueueMixin):
381     def __init__(self, client, local_path_u, db, collective_dircap, clock):
382         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
383
384         # TODO: allow a path rather than a cap URI.
385         self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
386
387         if not IDirectoryNode.providedBy(self._collective_dirnode):
388             raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
389         if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
390             raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
391
392         self._turn_delay = 3 # delay between remote scans
393         self._download_scan_batch = {} # path -> [(filenode, metadata)]
394
395     def start_scanning(self):
396         self._log("\nstart_scanning")
397         files = self._db.get_all_relpaths()
398         self._log("all files %s" % files)
399
400         d = self._scan_remote_collective()
401         self._turn_deque()
402         return d
403
404     def stop(self):
405         self._stopped = True
406         d = defer.succeed(None)
407         d.addCallback(lambda ign: self._lazy_tail)
408         return d
409
410     def _should_download(self, relpath_u, remote_version):
411         """
412         _should_download returns a bool indicating whether or not a remote object should be downloaded.
413         We check the remote metadata version against our magic-folder db version number;
414         latest version wins.
415         """
416         if magicpath.should_ignore_file(relpath_u):
417             return False
418         v = self._db.get_local_file_version(relpath_u)
419         return (v is None or v < remote_version)
420
421     def _get_local_latest(self, relpath_u):
422         """
423         _get_local_latest takes a unicode path string checks to see if this file object
424         exists in our magic-folder db; if not then return None
425         else check for an entry in our magic-folder db and return the version number.
426         """
427         if not self._get_filepath(relpath_u).exists():
428             return None
429         return self._db.get_local_file_version(relpath_u)
430
431     def _get_collective_latest_file(self, filename):
432         """
433         _get_collective_latest_file takes a file path pointing to a file managed by
434         magic-folder and returns a deferred that fires with the two tuple containing a
435         file node and metadata for the latest version of the file located in the
436         magic-folder collective directory.
437         """
438         collective_dirmap_d = self._collective_dirnode.list()
439         def scan_collective(result):
440             list_of_deferreds = []
441             for dir_name in result.keys():
442                 # XXX make sure it's a directory
443                 d = defer.succeed(None)
444                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
445                 list_of_deferreds.append(d)
446             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
447             return deferList
448         collective_dirmap_d.addCallback(scan_collective)
449         def highest_version(deferredList):
450             max_version = 0
451             metadata = None
452             node = None
453             for success, result in deferredList:
454                 if success:
455                     if result[1]['version'] > max_version:
456                         node, metadata = result
457                         max_version = result[1]['version']
458             return node, metadata
459         collective_dirmap_d.addCallback(highest_version)
460         return collective_dirmap_d
461
462     def _append_to_batch(self, name, file_node, metadata):
463         if self._download_scan_batch.has_key(name):
464             self._download_scan_batch[name] += [(file_node, metadata)]
465         else:
466             self._download_scan_batch[name] = [(file_node, metadata)]
467
468     def _scan_remote(self, nickname, dirnode):
469         self._log("_scan_remote nickname %r" % (nickname,))
470         d = dirnode.list()
471         def scan_listing(listing_map):
472             for name in listing_map.keys():
473                 file_node, metadata = listing_map[name]
474                 local_version = self._get_local_latest(name)
475                 remote_version = metadata.get('version', None)
476                 self._log("%r has local version %r, remote version %r" % (name, local_version, remote_version))
477                 if local_version is None or remote_version is None or local_version < remote_version:
478                     self._log("added to download queue\n")
479                     self._append_to_batch(name, file_node, metadata)
480         d.addCallback(scan_listing)
481         return d
482
483     def _scan_remote_collective(self):
484         self._log("_scan_remote_collective")
485         self._download_scan_batch = {} # XXX
486
487         if self._collective_dirnode is None:
488             return
489         collective_dirmap_d = self._collective_dirnode.list()
490         def do_list(result):
491             others = [x for x in result.keys()]
492             return result, others
493         collective_dirmap_d.addCallback(do_list)
494         def scan_collective(result):
495             d = defer.succeed(None)
496             collective_dirmap, others_list = result
497             for dir_name in others_list:
498                 d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
499                 # XXX todo add errback
500             return d
501         collective_dirmap_d.addCallback(scan_collective)
502         collective_dirmap_d.addCallback(self._filter_scan_batch)
503         collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
504         return collective_dirmap_d
505
506     def _add_batch_to_download_queue(self, result):
507         print "result = %r" % (result,)
508         print "deque = %r" % (self._deque,)
509         self._deque.extend(result)
510         print "deque after = %r" % (self._deque,)
511         self._count('objects_queued', len(result))
512         print "pending = %r" % (self._pending,)
513         self._pending.update(map(lambda x: x[0], result))
514         print "pending after = %r" % (self._pending,)
515
516     def _filter_scan_batch(self, result):
517         extension = [] # consider whether this should be a dict
518         for relpath_u in self._download_scan_batch.keys():
519             if relpath_u in self._pending:
520                 continue
521             file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
522             if self._should_download(relpath_u, metadata['version']):
523                 extension += [(relpath_u, file_node, metadata)]
524         return extension
525
526     def _when_queue_is_empty(self):
527         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
528         d.addCallback(lambda ign: self._turn_deque())
529         return d
530
531     def _process(self, item):
532         (relpath_u, file_node, metadata) = item
533         d = file_node.download_best_version()
534         def succeeded(res):
535             fp = self._get_filepath(relpath_u)
536             abspath_u = unicode_from_filepath(fp)
537             d2 = defer.succeed(res)
538             d2.addCallback(lambda result: self._write_downloaded_file(abspath_u, result, is_conflict=False))
539             def do_update_db(written_abspath_u):
540                 filecap = file_node.get_uri()
541                 written_pathinfo = get_pathinfo(written_abspath_u)
542                 if not written_pathinfo.exists:
543                     raise Exception("downloaded file %s disappeared" % quote_local_unicode_path(written_abspath_u))
544
545                 self._db.did_upload_version(filecap, relpath_u, metadata['version'], written_pathinfo)
546             d2.addCallback(do_update_db)
547             # XXX handle failure here with addErrback...
548             self._count('objects_downloaded')
549             return d2
550         def failed(f):
551             self._log("download failed: %s" % (str(f),))
552             self._count('objects_download_failed')
553             return f
554         d.addCallbacks(succeeded, failed)
555         def remove_from_pending(res):
556             self._pending.remove(relpath_u)
557             return res
558         d.addBoth(remove_from_pending)
559         return d
560
561     FUDGE_SECONDS = 10.0
562
563     @classmethod
564     def _write_downloaded_file(cls, abspath_u, file_contents, is_conflict=False, now=None):
565         # 1. Write a temporary file, say .foo.tmp.
566         # 2. is_conflict determines whether this is an overwrite or a conflict.
567         # 3. Set the mtime of the replacement file to be T seconds before the
568         #    current local time.
569         # 4. Perform a file replacement with backup filename foo.backup,
570         #    replaced file foo, and replacement file .foo.tmp. If any step of
571         #    this operation fails, reclassify as a conflict and stop.
572         #
573         # Returns the path of the destination file.
574
575         precondition_abspath(abspath_u)
576         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
577         backup_path_u = abspath_u + u".backup"
578         if now is None:
579             now = time.time()
580
581         fileutil.write(replacement_path_u, file_contents)
582         os.utime(replacement_path_u, (now, now - cls.FUDGE_SECONDS))
583         if is_conflict:
584             return cls._rename_conflicted_file(abspath_u, replacement_path_u)
585         else:
586             try:
587                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
588                 return abspath_u
589             except fileutil.ConflictError:
590                 return cls._rename_conflicted_file(abspath_u, replacement_path_u)
591
592     @classmethod
593     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
594         conflict_path_u = abspath_u + u".conflict"
595         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
596         return conflict_path_u