5 from collections import deque
8 from twisted.internet import defer, reactor, task
9 from twisted.python.failure import Failure
10 from twisted.python import runtime
11 from twisted.application import service
13 from allmydata.util import fileutil
14 from allmydata.interfaces import IDirectoryNode
15 from allmydata.util import log
16 from allmydata.util.fileutil import precondition_abspath, get_pathinfo
17 from allmydata.util.assertutil import precondition
18 from allmydata.util.deferredutil import HookMixin
19 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
20 extend_filepath, unicode_from_filepath, unicode_segments_from, \
21 quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
22 from allmydata.immutable.upload import FileName, Data
23 from allmydata import magicfolderdb, magicpath
26 IN_EXCL_UNLINK = 0x04000000L
28 def get_inotify_module():
30 if sys.platform == "win32":
31 from allmydata.windows import inotify
32 elif runtime.platform.supportsINotify():
33 from twisted.internet import inotify
35 raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
36 "This currently requires Linux or Windows.")
38 except (ImportError, AttributeError) as e:
40 if sys.platform == "win32":
41 raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
42 "Windows support requires at least Vista, and has only been tested on Windows 7.")
46 class MagicFolder(service.MultiService):
49 def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
50 pending_delay=1.0, clock=reactor):
51 precondition_abspath(local_path_u)
53 service.MultiService.__init__(self)
55 db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
57 return Failure(Exception('ERROR: Unable to load magic folder db.'))
65 self.uploader = Uploader(client, local_path_u, db, upload_dircap, pending_delay, clock)
66 self.downloader = Downloader(client, local_path_u, db, collective_dircap, clock)
68 def startService(self):
69 # TODO: why is this being called more than once?
71 return defer.succeed(None)
72 print "%r.startService" % (self,)
73 service.MultiService.startService(self)
74 return self.uploader.start_monitoring()
77 """ready is used to signal us to start
78 processing the upload and download items...
81 d = self.uploader.start_scanning()
82 d2 = self.downloader.start_scanning()
83 d.addCallback(lambda ign: d2)
88 d = self.uploader.stop()
89 d2 = self.downloader.stop()
90 d.addCallback(lambda ign: d2)
93 def remove_service(self):
94 return service.MultiService.disownServiceParent(self)
97 class QueueMixin(HookMixin):
98 def __init__(self, client, local_path_u, db, name, clock):
100 self._local_path_u = local_path_u
101 self._local_filepath = to_filepath(local_path_u)
105 self._hooks = {'processed': None, 'started': None}
106 self.started_d = self.set_hook('started')
108 if not self._local_filepath.exists():
109 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
110 "but there is no directory at that location."
111 % quote_local_unicode_path(self._local_path_u))
112 if not self._local_filepath.isdir():
113 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
114 "but the thing at that location is not a directory."
115 % quote_local_unicode_path(self._local_path_u))
117 self._deque = deque()
118 self._lazy_tail = defer.succeed(None)
119 self._pending = set()
120 self._stopped = False
123 def _get_filepath(self, relpath_u):
124 return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
126 def _get_relpath(self, filepath):
127 self._log("_get_relpath(%r)" % (filepath,))
128 segments = unicode_segments_from(filepath, self._local_filepath)
129 self._log("segments = %r" % (segments,))
130 return u"/".join(segments)
132 def _count(self, counter_name, delta=1):
133 ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
134 self._log("%s += %r" % (counter_name, delta))
135 self._client.stats_provider.count(ctr, delta)
137 def _logcb(self, res, msg):
138 self._log("%s: %r" % (msg, res))
142 s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
145 #open("events", "ab+").write(msg)
147 def _append_to_deque(self, relpath_u):
148 self._log("_append_to_deque(%r)" % (relpath_u,))
149 if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
151 self._deque.append(relpath_u)
152 self._pending.add(relpath_u)
153 self._count('objects_queued')
155 self._clock.callLater(0, self._turn_deque)
157 def _turn_deque(self):
158 self._log("_turn_deque")
163 item = self._deque.pop()
164 self._log("popped %r" % (item,))
165 self._count('objects_queued', -1)
167 self._log("deque is now empty")
168 self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
170 self._lazy_tail.addCallback(lambda ign: self._process(item))
171 self._lazy_tail.addBoth(self._call_hook, 'processed')
172 self._lazy_tail.addErrback(log.err)
173 self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
176 class Uploader(QueueMixin):
177 def __init__(self, client, local_path_u, db, upload_dircap, pending_delay, clock):
178 QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
180 self.is_ready = False
182 # TODO: allow a path rather than a cap URI.
183 self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
184 if not IDirectoryNode.providedBy(self._upload_dirnode):
185 raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
186 if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
187 raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
189 self._inotify = get_inotify_module()
190 self._notifier = self._inotify.INotify()
192 if hasattr(self._notifier, 'set_pending_delay'):
193 self._notifier.set_pending_delay(pending_delay)
195 # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
197 self.mask = ( self._inotify.IN_CREATE
198 | self._inotify.IN_CLOSE_WRITE
199 | self._inotify.IN_MOVED_TO
200 | self._inotify.IN_MOVED_FROM
201 | self._inotify.IN_DELETE
202 | self._inotify.IN_ONLYDIR
205 self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
208 def start_monitoring(self):
209 self._log("start_monitoring")
210 d = defer.succeed(None)
211 d.addCallback(lambda ign: self._notifier.startReading())
212 d.addCallback(lambda ign: self._count('dirs_monitored'))
213 d.addBoth(self._call_hook, 'started')
218 self._notifier.stopReading()
219 self._count('dirs_monitored', -1)
220 if hasattr(self._notifier, 'wait_until_stopped'):
221 d = self._notifier.wait_until_stopped()
223 d = defer.succeed(None)
224 d.addCallback(lambda ign: self._lazy_tail)
227 def start_scanning(self):
228 self._log("start_scanning")
230 self._pending = self._db.get_all_relpaths()
231 self._log("all_files %r" % (self._pending))
233 def _add_pending(ign):
234 # This adds all of the files that were in the db but not already processed
235 # (normally because they have been deleted on disk).
236 self._log("adding %r" % (self._pending))
237 self._deque.extend(self._pending)
238 d.addCallback(_add_pending)
239 d.addCallback(lambda ign: self._turn_deque())
242 def _scan(self, reldir_u):
243 self._log("scan %r" % (reldir_u,))
244 fp = self._get_filepath(reldir_u)
246 children = listdir_filepath(fp)
247 except EnvironmentError:
248 raise Exception("WARNING: magic folder: permission denied on directory %s"
249 % quote_filepath(fp))
250 except FilenameEncodingError:
251 raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
252 % quote_filepath(fp))
254 d = defer.succeed(None)
255 for child in children:
256 assert isinstance(child, unicode), child
257 d.addCallback(lambda ign, child=child:
258 ("%s/%s" % (reldir_u, child) if reldir_u else child))
259 def _add_pending(relpath_u):
260 if magicpath.should_ignore_file(relpath_u):
263 self._pending.add(relpath_u)
265 d.addCallback(_add_pending)
266 # This call to _process doesn't go through the deque, and probably should.
267 d.addCallback(self._process)
268 d.addBoth(self._call_hook, 'processed')
269 d.addErrback(log.err)
273 def _notify(self, opaque, path, events_mask):
274 self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
276 # We filter out IN_CREATE events not associated with a directory.
277 # Acting on IN_CREATE for files could cause us to read and upload
278 # a possibly-incomplete file before the application has closed it.
279 # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
280 # It isn't possible to avoid watching for IN_CREATE at all, because
281 # it is the only event notified for a directory creation.
283 if ((events_mask & self._inotify.IN_CREATE) != 0 and
284 (events_mask & self._inotify.IN_ISDIR) == 0):
285 self._log("ignoring inotify event for creation of file %r\n" % (path,))
288 relpath_u = self._get_relpath(path)
289 self._append_to_deque(relpath_u)
291 def _when_queue_is_empty(self):
292 return defer.succeed(None)
294 def _process(self, relpath_u):
295 self._log("_process(%r)" % (relpath_u,))
296 if relpath_u is None:
298 precondition(isinstance(relpath_u, unicode), relpath_u)
300 d = defer.succeed(None)
302 def _maybe_upload(val, now=None):
305 fp = self._get_filepath(relpath_u)
306 pathinfo = get_pathinfo(unicode_from_filepath(fp))
308 self._log("pending = %r, about to remove %r" % (self._pending, relpath_u))
309 self._pending.remove(relpath_u)
310 encoded_path_u = magicpath.path2magic(relpath_u)
312 if not pathinfo.exists:
313 # FIXME merge this with the 'isfile' case.
314 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
315 self._count('objects_disappeared')
316 if not self._db.check_file_db_exists(relpath_u):
319 last_downloaded_timestamp = now
320 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
322 current_version = self._db.get_local_file_version(relpath_u)
323 if current_version is None:
325 elif self._db.is_new_file(pathinfo, relpath_u):
326 new_version = current_version + 1
328 self._log("ignoring {}".format(relpath_u))
331 metadata = { 'version': new_version,
333 'last_downloaded_timestamp': last_downloaded_timestamp }
334 if last_downloaded_uri is not None:
335 metadata['last_downloaded_uri'] = last_downloaded_uri
337 empty_uploadable = Data("", self._client.convergence)
338 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
339 metadata=metadata, overwrite=True)
341 def _add_db_entry(filenode):
342 filecap = filenode.get_uri()
343 self._db.did_upload_version(relpath_u, new_version, filecap,
344 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
345 self._count('files_uploaded')
346 d2.addCallback(_add_db_entry)
348 elif pathinfo.islink:
349 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
352 self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
353 uploadable = Data("", self._client.convergence)
354 encoded_path_u += magicpath.path2magic(u"/")
355 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
357 self._log("created subdirectory %r" % (relpath_u,))
358 self._count('directories_created')
360 self._log("failed to create subdirectory %r" % (relpath_u,))
362 upload_d.addCallbacks(_succeeded, _failed)
363 upload_d.addCallback(lambda ign: self._scan(relpath_u))
365 elif pathinfo.isfile:
366 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
367 last_downloaded_timestamp = now
369 current_version = self._db.get_local_file_version(relpath_u)
370 if current_version is None:
372 elif self._db.is_new_file(pathinfo, relpath_u):
373 new_version = current_version + 1
377 metadata = { 'version': new_version,
378 'last_downloaded_timestamp': last_downloaded_timestamp }
379 if last_downloaded_uri is not None:
380 metadata['last_downloaded_uri'] = last_downloaded_uri
382 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
383 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
384 metadata=metadata, overwrite=True)
386 def _add_db_entry(filenode):
387 filecap = filenode.get_uri()
388 last_downloaded_uri = metadata.get('last_downloaded_uri', None)
389 self._db.did_upload_version(relpath_u, new_version, filecap,
390 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
391 self._count('files_uploaded')
392 d2.addCallback(_add_db_entry)
395 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
398 d.addCallback(_maybe_upload)
401 self._count('objects_succeeded')
404 self._count('objects_failed')
405 self._log("%r while processing %r" % (f, relpath_u))
407 d.addCallbacks(_succeeded, _failed)
410 def _get_metadata(self, encoded_path_u):
412 d = self._upload_dirnode.get_metadata_for(encoded_path_u)
417 def _get_filenode(self, encoded_path_u):
419 d = self._upload_dirnode.get(encoded_path_u)
425 class WriteFileMixin(object):
428 def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
429 self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
430 % (abspath_u, len(file_contents), is_conflict, now))
432 # 1. Write a temporary file, say .foo.tmp.
433 # 2. is_conflict determines whether this is an overwrite or a conflict.
434 # 3. Set the mtime of the replacement file to be T seconds before the
435 # current local time.
436 # 4. Perform a file replacement with backup filename foo.backup,
437 # replaced file foo, and replacement file .foo.tmp. If any step of
438 # this operation fails, reclassify as a conflict and stop.
440 # Returns the path of the destination file.
442 precondition_abspath(abspath_u)
443 replacement_path_u = abspath_u + u".tmp" # FIXME more unique
444 backup_path_u = abspath_u + u".backup"
448 # ensure parent directory exists
449 head, tail = os.path.split(abspath_u)
451 fileutil.make_dirs(head, mode)
453 fileutil.write(replacement_path_u, file_contents)
454 os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
456 return self._rename_conflicted_file(abspath_u, replacement_path_u)
459 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
461 except fileutil.ConflictError:
462 return self._rename_conflicted_file(abspath_u, replacement_path_u)
464 def _rename_conflicted_file(self, abspath_u, replacement_path_u):
465 self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
467 conflict_path_u = abspath_u + u".conflict"
468 fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
469 return conflict_path_u
472 class Downloader(QueueMixin, WriteFileMixin):
473 REMOTE_SCAN_INTERVAL = 3 # facilitates tests
475 def __init__(self, client, local_path_u, db, collective_dircap, clock):
476 QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
478 # TODO: allow a path rather than a cap URI.
479 self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
481 if not IDirectoryNode.providedBy(self._collective_dirnode):
482 raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
483 if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
484 raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
486 self._turn_delay = self.REMOTE_SCAN_INTERVAL
487 self._download_scan_batch = {} # path -> [(filenode, metadata)]
489 def start_scanning(self):
490 self._log("start_scanning")
491 files = self._db.get_all_relpaths()
492 self._log("all files %s" % files)
494 d = self._scan_remote_collective()
500 d = defer.succeed(None)
501 d.addCallback(lambda ign: self._lazy_tail)
504 def _should_download(self, relpath_u, remote_version):
506 _should_download returns a bool indicating whether or not a remote object should be downloaded.
507 We check the remote metadata version against our magic-folder db version number;
510 self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
511 if magicpath.should_ignore_file(relpath_u):
515 v = self._db.get_local_file_version(relpath_u)
516 self._log("v = %r" % (v,))
517 return (v is None or v < remote_version)
519 def _get_local_latest(self, relpath_u):
521 _get_local_latest takes a unicode path string checks to see if this file object
522 exists in our magic-folder db; if not then return None
523 else check for an entry in our magic-folder db and return the version number.
525 if not self._get_filepath(relpath_u).exists():
527 return self._db.get_local_file_version(relpath_u)
529 def _get_collective_latest_file(self, filename):
531 _get_collective_latest_file takes a file path pointing to a file managed by
532 magic-folder and returns a deferred that fires with the two tuple containing a
533 file node and metadata for the latest version of the file located in the
534 magic-folder collective directory.
536 collective_dirmap_d = self._collective_dirnode.list()
537 def scan_collective(result):
538 list_of_deferreds = []
539 for dir_name in result.keys():
540 # XXX make sure it's a directory
541 d = defer.succeed(None)
542 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
543 list_of_deferreds.append(d)
544 deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
546 collective_dirmap_d.addCallback(scan_collective)
547 def highest_version(deferredList):
551 for success, result in deferredList:
553 if result[1]['version'] > max_version:
554 node, metadata = result
555 max_version = result[1]['version']
556 return node, metadata
557 collective_dirmap_d.addCallback(highest_version)
558 return collective_dirmap_d
560 def _append_to_batch(self, name, file_node, metadata):
561 if self._download_scan_batch.has_key(name):
562 self._download_scan_batch[name] += [(file_node, metadata)]
564 self._download_scan_batch[name] = [(file_node, metadata)]
566 def _scan_remote(self, nickname, dirnode):
567 self._log("_scan_remote nickname %r" % (nickname,))
569 def scan_listing(listing_map):
570 for encoded_relpath_u in listing_map.keys():
571 relpath_u = magicpath.magic2path(encoded_relpath_u)
572 self._log("found %r" % (relpath_u,))
574 file_node, metadata = listing_map[encoded_relpath_u]
575 local_version = self._get_local_latest(relpath_u)
576 remote_version = metadata.get('version', None)
577 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
578 if local_version is None or remote_version is None or local_version < remote_version:
579 self._log("%r added to download queue" % (relpath_u,))
580 self._append_to_batch(relpath_u, file_node, metadata)
581 d.addCallback(scan_listing)
582 d.addBoth(self._logcb, "end of _scan_remote")
585 def _scan_remote_collective(self):
586 self._log("_scan_remote_collective")
587 self._download_scan_batch = {} # XXX
589 if self._collective_dirnode is None:
591 collective_dirmap_d = self._collective_dirnode.list()
593 others = [x for x in result.keys()]
594 return result, others
595 collective_dirmap_d.addCallback(do_list)
596 def scan_collective(result):
597 d = defer.succeed(None)
598 collective_dirmap, others_list = result
599 for dir_name in others_list:
600 d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
601 # XXX todo add errback
603 collective_dirmap_d.addCallback(scan_collective)
604 collective_dirmap_d.addCallback(self._filter_scan_batch)
605 collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
606 return collective_dirmap_d
608 def _add_batch_to_download_queue(self, result):
609 self._log("result = %r" % (result,))
610 self._log("deque = %r" % (self._deque,))
611 self._deque.extend(result)
612 self._log("deque after = %r" % (self._deque,))
613 self._count('objects_queued', len(result))
614 self._log("pending = %r" % (self._pending,))
615 self._pending.update(map(lambda x: x[0], result))
616 self._log("pending after = %r" % (self._pending,))
618 def _filter_scan_batch(self, result):
619 self._log("_filter_scan_batch")
620 extension = [] # consider whether this should be a dict
621 for relpath_u in self._download_scan_batch.keys():
622 if relpath_u in self._pending:
624 file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
625 if self._should_download(relpath_u, metadata['version']):
626 extension += [(relpath_u, file_node, metadata)]
628 self._count('objects_excluded')
629 self._call_hook(None, 'processed')
632 def _when_queue_is_empty(self):
633 d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
634 d.addBoth(self._logcb, "after _scan_remote_collective")
635 d.addCallback(lambda ign: self._turn_deque())
638 def _process(self, item, now=None):
639 self._log("_process(%r)" % (item,))
642 (relpath_u, file_node, metadata) = item
643 fp = self._get_filepath(relpath_u)
644 abspath_u = unicode_from_filepath(fp)
646 d = defer.succeed(None)
647 if relpath_u.endswith(u"/"):
648 self._log("mkdir(%r)" % (abspath_u,))
649 d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
650 d.addCallback(lambda ign: abspath_u)
652 d.addCallback(lambda ign: file_node.download_best_version())
653 if metadata.get('deleted', False):
654 d.addCallback(lambda result: self._unlink_deleted_file(abspath_u, result))
656 d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents, is_conflict=False))
658 def do_update_db(written_abspath_u):
659 filecap = file_node.get_uri()
660 last_uploaded_uri = metadata.get('last_uploaded_uri', None)
661 last_downloaded_uri = filecap
662 last_downloaded_timestamp = now
663 written_pathinfo = get_pathinfo(written_abspath_u)
664 if not written_pathinfo.exists and not metadata.get('deleted', False):
665 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
667 self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
668 last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
669 self._count('objects_downloaded')
671 self._log("download failed: %s" % (str(f),))
672 self._count('objects_failed')
674 d.addCallbacks(do_update_db, failed)
675 def remove_from_pending(res):
676 self._pending.remove(relpath_u)
678 d.addBoth(remove_from_pending)
681 def _unlink_deleted_file(self, abspath_u, result):
683 self._log('unlinking: %s' % (abspath_u,))
684 shutil.move(abspath_u, abspath_u + '.backup')
686 self._log("Already gone: '%s'" % (abspath_u,))