]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
For all downloaded files ensure parent dir exists
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import backupdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for drop-upload is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for drop-upload is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 class MagicFolder(service.MultiService):
46     name = 'magic-folder'
47
48     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49                  pending_delay=1.0, clock=reactor):
50         precondition_abspath(local_path_u)
51
52         service.MultiService.__init__(self)
53
54         db = backupdb.get_backupdb(dbfile, create_version=(backupdb.SCHEMA_v3, 3))
55         if db is None:
56             return Failure(Exception('ERROR: Unable to load magic folder db.'))
57
58         # for tests
59         self._client = client
60         self._db = db
61
62         self.is_ready = False
63
64         self.uploader = Uploader(client, local_path_u, db, upload_dircap, pending_delay, clock)
65         self.downloader = Downloader(client, local_path_u, db, collective_dircap, clock)
66
67     def startService(self):
68         # TODO: why is this being called more than once?
69         if self.running:
70             return defer.succeed(None)
71         print "%r.startService" % (self,)
72         service.MultiService.startService(self)
73         return self.uploader.start_monitoring()
74
75     def ready(self):
76         """ready is used to signal us to start
77         processing the upload and download items...
78         """
79         self.is_ready = True
80         d = self.uploader.start_scanning()
81         d2 = self.downloader.start_scanning()
82         d.addCallback(lambda ign: d2)
83         return d
84
85     def finish(self):
86         print "finish"
87         d = self.uploader.stop()
88         d2 = self.downloader.stop()
89         d.addCallback(lambda ign: d2)
90         return d
91
92     def remove_service(self):
93         return service.MultiService.disownServiceParent(self)
94
95
96 class QueueMixin(HookMixin):
97     def __init__(self, client, local_path_u, db, name, clock):
98         self._client = client
99         self._local_path_u = local_path_u
100         self._local_filepath = to_filepath(local_path_u)
101         self._db = db
102         self._name = name
103         self._clock = clock
104         self._hooks = {'processed': None, 'started': None}
105         self.started_d = self.set_hook('started')
106
107         if not self._local_filepath.exists():
108             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
109                                  "but there is no directory at that location."
110                                  % quote_local_unicode_path(self._local_path_u))
111         if not self._local_filepath.isdir():
112             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
113                                  "but the thing at that location is not a directory."
114                                  % quote_local_unicode_path(self._local_path_u))
115
116         self._deque = deque()
117         self._lazy_tail = defer.succeed(None)
118         self._pending = set()
119         self._stopped = False
120         self._turn_delay = 0
121
122     def _get_filepath(self, relpath_u):
123         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
124
125     def _get_relpath(self, filepath):
126         print "_get_relpath(%r)" % (filepath,)
127         segments = unicode_segments_from(filepath, self._local_filepath)
128         print "segments = %r" % (segments,)
129         return u"/".join(segments)
130
131     def _count(self, counter_name, delta=1):
132         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
133         print "%r += %r" % (ctr, delta)
134         self._client.stats_provider.count(ctr, delta)
135
136     def _log(self, msg):
137         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
138         self._client.log(s)
139         print s
140         #open("events", "ab+").write(msg)
141
142     def _append_to_deque(self, relpath_u):
143         print "_append_to_deque(%r)" % (relpath_u,)
144         if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
145             return
146         self._deque.append(relpath_u)
147         self._pending.add(relpath_u)
148         self._count('objects_queued')
149         if self.is_ready:
150             self._clock.callLater(0, self._turn_deque)
151
152     def _turn_deque(self):
153         if self._stopped:
154             return
155         try:
156             item = self._deque.pop()
157             self._count('objects_queued', -1)
158         except IndexError:
159             self._log("deque is now empty")
160             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
161         else:
162             self._lazy_tail.addCallback(lambda ign: self._process(item))
163             self._lazy_tail.addBoth(self._call_hook, 'processed')
164             self._lazy_tail.addErrback(log.err)
165             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
166
167
168 class Uploader(QueueMixin):
169     def __init__(self, client, local_path_u, db, upload_dircap, pending_delay, clock):
170         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
171
172         self.is_ready = False
173
174         # TODO: allow a path rather than a cap URI.
175         self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
176         if not IDirectoryNode.providedBy(self._upload_dirnode):
177             raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
178         if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
179             raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
180
181         self._inotify = get_inotify_module()
182         self._notifier = self._inotify.INotify()
183
184         if hasattr(self._notifier, 'set_pending_delay'):
185             self._notifier.set_pending_delay(pending_delay)
186
187         # We don't watch for IN_CREATE, because that would cause us to read and upload a
188         # possibly-incomplete file before the application has closed it. There should always
189         # be an IN_CLOSE_WRITE after an IN_CREATE (I think).
190         # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
191         #
192         self.mask = ( self._inotify.IN_CLOSE_WRITE
193                     | self._inotify.IN_MOVED_TO
194                     | self._inotify.IN_MOVED_FROM
195                     | self._inotify.IN_DELETE
196                     | self._inotify.IN_ONLYDIR
197                     | IN_EXCL_UNLINK
198                     )
199         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
200                              recursive=True)
201
202     def start_monitoring(self):
203         self._log("start_monitoring")
204         d = defer.succeed(None)
205         d.addCallback(lambda ign: self._notifier.startReading())
206         d.addCallback(lambda ign: self._count('dirs_monitored'))
207         d.addBoth(self._call_hook, 'started')
208         return d
209
210     def stop(self):
211         self._log("stop")
212         self._notifier.stopReading()
213         self._count('dirs_monitored', -1)
214         if hasattr(self._notifier, 'wait_until_stopped'):
215             d = self._notifier.wait_until_stopped()
216         else:
217             d = defer.succeed(None)
218         d.addCallback(lambda ign: self._lazy_tail)
219         return d
220
221     def start_scanning(self):
222         self._log("start_scanning")
223         self.is_ready = True
224         self._pending = self._db.get_all_relpaths()
225         print "all_files %r" % (self._pending)
226         d = self._scan(u"")
227         def _add_pending(ign):
228             # This adds all of the files that were in the db but not already processed
229             # (normally because they have been deleted on disk).
230             print "adding %r" % (self._pending)
231             self._deque.extend(self._pending)
232         d.addCallback(_add_pending)
233         d.addCallback(lambda ign: self._turn_deque())
234         return d
235
236     def _scan(self, reldir_u):
237         self._log("scan %r" % (reldir_u,))
238         fp = self._get_filepath(reldir_u)
239         try:
240             children = listdir_filepath(fp)
241         except EnvironmentError:
242             raise Exception("WARNING: magic folder: permission denied on directory %s"
243                             % quote_filepath(fp))
244         except FilenameEncodingError:
245             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
246                             % quote_filepath(fp))
247
248         d = defer.succeed(None)
249         for child in children:
250             assert isinstance(child, unicode), child
251             d.addCallback(lambda ign, child=child:
252                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
253             def _add_pending(relpath_u):
254                 if magicpath.should_ignore_file(relpath_u):
255                     return None
256
257                 self._pending.add(relpath_u)
258                 return relpath_u
259             d.addCallback(_add_pending)
260             # This call to _process doesn't go through the deque, and probably should.
261             d.addCallback(self._process)
262             d.addBoth(self._call_hook, 'processed')
263             d.addErrback(log.err)
264
265         return d
266
267     def _notify(self, opaque, path, events_mask):
268         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
269         relpath_u = self._get_relpath(path)
270         self._append_to_deque(relpath_u)
271
272     def _when_queue_is_empty(self):
273         return defer.succeed(None)
274
275     def _process(self, relpath_u):
276         self._log("_process(%r)" % (relpath_u,))
277         if relpath_u is None:
278             return
279         precondition(isinstance(relpath_u, unicode), relpath_u)
280
281         d = defer.succeed(None)
282
283         def _maybe_upload(val):
284             fp = self._get_filepath(relpath_u)
285             pathinfo = get_pathinfo(unicode_from_filepath(fp))
286
287             print "pending = %r, about to remove %r" % (self._pending, relpath_u)
288             self._pending.remove(relpath_u)
289             encoded_path_u = magicpath.path2magic(relpath_u)
290
291             if not pathinfo.exists:
292                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
293                 self._count('objects_disappeared')
294                 d2 = defer.succeed(None)
295                 if self._db.check_file_db_exists(relpath_u):
296                     d2.addCallback(lambda ign: self._get_metadata(encoded_path_u))
297                     current_version = self._db.get_local_file_version(relpath_u) + 1
298                     def set_deleted(metadata):
299                         metadata['version'] = current_version
300                         metadata['deleted'] = True
301                         empty_uploadable = Data("", self._client.convergence)
302                         return self._upload_dirnode.add_file(encoded_path_u, empty_uploadable, overwrite=True, metadata=metadata)
303                     d2.addCallback(set_deleted)
304                     def add_db_entry(filenode):
305                         filecap = filenode.get_uri()
306                         self._db.did_upload_version(filecap, relpath_u, current_version, pathinfo)
307                         self._count('files_uploaded')
308                     # FIXME consider whether it's correct to retrieve the filenode again.
309                     d2.addCallback(lambda x: self._get_filenode(encoded_path_u))
310                     d2.addCallback(add_db_entry)
311
312                 d2.addCallback(lambda x: Exception("file does not exist"))  # FIXME wrong
313                 return d2
314             elif pathinfo.islink:
315                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
316                 return None
317             elif pathinfo.isdir:
318                 self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
319                 uploadable = Data("", self._client.convergence)
320                 encoded_path_u += magicpath.path2magic(u"/")
321                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
322                 def _succeeded(ign):
323                     self._log("created subdirectory %r" % (relpath_u,))
324                     self._count('directories_created')
325                 def _failed(f):
326                     self._log("failed to create subdirectory %r" % (relpath_u,))
327                     return f
328                 upload_d.addCallbacks(_succeeded, _failed)
329                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
330                 return upload_d
331             elif pathinfo.isfile:
332                 version = self._db.get_local_file_version(relpath_u)
333                 if version is None:
334                     version = 0
335                 elif self._db.is_new_file(pathinfo, relpath_u):
336                     version += 1
337                 else:
338                     return None
339
340                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
341                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":version}, overwrite=True)
342                 def add_db_entry(filenode):
343                     filecap = filenode.get_uri()
344                     self._db.did_upload_version(filecap, relpath_u, version, pathinfo)
345                     self._count('files_uploaded')
346                 d2.addCallback(add_db_entry)
347                 return d2
348             else:
349                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
350                 return None
351
352         d.addCallback(_maybe_upload)
353
354         def _succeeded(res):
355             self._count('objects_succeeded')
356             return res
357         def _failed(f):
358             print f
359             self._count('objects_failed')
360             self._log("%r while processing %r" % (f, relpath_u))
361             return f
362         d.addCallbacks(_succeeded, _failed)
363         return d
364
365     def _get_metadata(self, encoded_path_u):
366         try:
367             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
368         except KeyError:
369             return Failure()
370         return d
371
372     def _get_filenode(self, encoded_path_u):
373         try:
374             d = self._upload_dirnode.get(encoded_path_u)
375         except KeyError:
376             return Failure()
377         return d
378
379
380 class Downloader(QueueMixin):
381     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
382
383     def __init__(self, client, local_path_u, db, collective_dircap, clock):
384         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
385
386         # TODO: allow a path rather than a cap URI.
387         self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
388
389         if not IDirectoryNode.providedBy(self._collective_dirnode):
390             raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
391         if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
392             raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
393
394         self._turn_delay = self.REMOTE_SCAN_INTERVAL
395         self._download_scan_batch = {} # path -> [(filenode, metadata)]
396
397     def start_scanning(self):
398         self._log("\nstart_scanning")
399         files = self._db.get_all_relpaths()
400         self._log("all files %s" % files)
401
402         d = self._scan_remote_collective()
403         self._turn_deque()
404         return d
405
406     def stop(self):
407         self._stopped = True
408         d = defer.succeed(None)
409         d.addCallback(lambda ign: self._lazy_tail)
410         return d
411
412     def _should_download(self, relpath_u, remote_version):
413         """
414         _should_download returns a bool indicating whether or not a remote object should be downloaded.
415         We check the remote metadata version against our magic-folder db version number;
416         latest version wins.
417         """
418         if magicpath.should_ignore_file(relpath_u):
419             return False
420         v = self._db.get_local_file_version(relpath_u)
421         return (v is None or v < remote_version)
422
423     def _get_local_latest(self, relpath_u):
424         """
425         _get_local_latest takes a unicode path string checks to see if this file object
426         exists in our magic-folder db; if not then return None
427         else check for an entry in our magic-folder db and return the version number.
428         """
429         if not self._get_filepath(relpath_u).exists():
430             return None
431         return self._db.get_local_file_version(relpath_u)
432
433     def _get_collective_latest_file(self, filename):
434         """
435         _get_collective_latest_file takes a file path pointing to a file managed by
436         magic-folder and returns a deferred that fires with the two tuple containing a
437         file node and metadata for the latest version of the file located in the
438         magic-folder collective directory.
439         """
440         collective_dirmap_d = self._collective_dirnode.list()
441         def scan_collective(result):
442             list_of_deferreds = []
443             for dir_name in result.keys():
444                 # XXX make sure it's a directory
445                 d = defer.succeed(None)
446                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
447                 list_of_deferreds.append(d)
448             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
449             return deferList
450         collective_dirmap_d.addCallback(scan_collective)
451         def highest_version(deferredList):
452             max_version = 0
453             metadata = None
454             node = None
455             for success, result in deferredList:
456                 if success:
457                     if result[1]['version'] > max_version:
458                         node, metadata = result
459                         max_version = result[1]['version']
460             return node, metadata
461         collective_dirmap_d.addCallback(highest_version)
462         return collective_dirmap_d
463
464     def _append_to_batch(self, name, file_node, metadata):
465         if self._download_scan_batch.has_key(name):
466             self._download_scan_batch[name] += [(file_node, metadata)]
467         else:
468             self._download_scan_batch[name] = [(file_node, metadata)]
469
470     def _scan_remote(self, nickname, dirnode):
471         self._log("_scan_remote nickname %r" % (nickname,))
472         d = dirnode.list()
473         def scan_listing(listing_map):
474             for name in listing_map.keys():
475                 file_node, metadata = listing_map[name]
476                 local_version = self._get_local_latest(name)
477                 remote_version = metadata.get('version', None)
478                 self._log("%r has local version %r, remote version %r" % (name, local_version, remote_version))
479                 if local_version is None or remote_version is None or local_version < remote_version:
480                     self._log("added to download queue\n")
481                     self._append_to_batch(name, file_node, metadata)
482         d.addCallback(scan_listing)
483         return d
484
485     def _scan_remote_collective(self):
486         self._log("_scan_remote_collective")
487         self._download_scan_batch = {} # XXX
488
489         if self._collective_dirnode is None:
490             return
491         collective_dirmap_d = self._collective_dirnode.list()
492         def do_list(result):
493             others = [x for x in result.keys()]
494             return result, others
495         collective_dirmap_d.addCallback(do_list)
496         def scan_collective(result):
497             d = defer.succeed(None)
498             collective_dirmap, others_list = result
499             for dir_name in others_list:
500                 d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
501                 # XXX todo add errback
502             return d
503         collective_dirmap_d.addCallback(scan_collective)
504         collective_dirmap_d.addCallback(self._filter_scan_batch)
505         collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
506         return collective_dirmap_d
507
508     def _add_batch_to_download_queue(self, result):
509         print "result = %r" % (result,)
510         print "deque = %r" % (self._deque,)
511         self._deque.extend(result)
512         print "deque after = %r" % (self._deque,)
513         self._count('objects_queued', len(result))
514         print "pending = %r" % (self._pending,)
515         self._pending.update(map(lambda x: x[0], result))
516         print "pending after = %r" % (self._pending,)
517
518     def _filter_scan_batch(self, result):
519         extension = [] # consider whether this should be a dict
520         for relpath_u in self._download_scan_batch.keys():
521             if relpath_u in self._pending:
522                 continue
523             file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
524             if self._should_download(relpath_u, metadata['version']):
525                 extension += [(relpath_u, file_node, metadata)]
526         return extension
527
528     def _when_queue_is_empty(self):
529         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
530         d.addCallback(lambda ign: self._turn_deque())
531         return d
532
533     def _process(self, item):
534         (relpath_u, file_node, metadata) = item
535         d = file_node.download_best_version()
536         def succeeded(res):
537             fp = self._get_filepath(relpath_u)
538             abspath_u = unicode_from_filepath(fp)
539             d2 = defer.succeed(res)
540             d2.addCallback(lambda result: self._write_downloaded_file(abspath_u, result, is_conflict=False))
541             def do_update_db(written_abspath_u):
542                 filecap = file_node.get_uri()
543                 written_pathinfo = get_pathinfo(written_abspath_u)
544                 if not written_pathinfo.exists:
545                     raise Exception("downloaded file %s disappeared" % quote_local_unicode_path(written_abspath_u))
546
547                 self._db.did_upload_version(filecap, relpath_u, metadata['version'], written_pathinfo)
548             d2.addCallback(do_update_db)
549             # XXX handle failure here with addErrback...
550             self._count('objects_downloaded')
551             return d2
552         def failed(f):
553             self._log("download failed: %s" % (str(f),))
554             self._count('objects_download_failed')
555             return f
556         d.addCallbacks(succeeded, failed)
557         def remove_from_pending(res):
558             self._pending.remove(relpath_u)
559             return res
560         d.addBoth(remove_from_pending)
561         return d
562
563     FUDGE_SECONDS = 10.0
564
565     @classmethod
566     def _write_downloaded_file(cls, abspath_u, file_contents, is_conflict=False, now=None):
567         # 1. Write a temporary file, say .foo.tmp.
568         # 2. is_conflict determines whether this is an overwrite or a conflict.
569         # 3. Set the mtime of the replacement file to be T seconds before the
570         #    current local time.
571         # 4. Perform a file replacement with backup filename foo.backup,
572         #    replaced file foo, and replacement file .foo.tmp. If any step of
573         #    this operation fails, reclassify as a conflict and stop.
574         #
575         # Returns the path of the destination file.
576
577         precondition_abspath(abspath_u)
578         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
579         backup_path_u = abspath_u + u".backup"
580         if now is None:
581             now = time.time()
582
583         # ensure parent directory exists
584         head, tail = os.path.split(abspath_u)
585         mode = 0777 # XXX
586         fileutil.make_dirs(head, mode)
587
588         fileutil.write(replacement_path_u, file_contents)
589         os.utime(replacement_path_u, (now, now - cls.FUDGE_SECONDS))
590         if is_conflict:
591             return cls._rename_conflicted_file(abspath_u, replacement_path_u)
592         else:
593             try:
594                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
595                 return abspath_u
596             except fileutil.ConflictError:
597                 return cls._rename_conflicted_file(abspath_u, replacement_path_u)
598
599     @classmethod
600     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
601         conflict_path_u = abspath_u + u".conflict"
602         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
603         return conflict_path_u