5 from collections import deque
8 from twisted.internet import defer, reactor, task
9 from twisted.python.failure import Failure
10 from twisted.python import runtime
11 from twisted.application import service
13 from allmydata.util import fileutil
14 from allmydata.interfaces import IDirectoryNode
15 from allmydata.util import log
16 from allmydata.util.fileutil import precondition_abspath, get_pathinfo
17 from allmydata.util.assertutil import precondition
18 from allmydata.util.deferredutil import HookMixin
19 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
20 extend_filepath, unicode_from_filepath, unicode_segments_from, \
21 quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
22 from allmydata.immutable.upload import FileName, Data
23 from allmydata import magicfolderdb, magicpath
26 IN_EXCL_UNLINK = 0x04000000L
28 def get_inotify_module():
30 if sys.platform == "win32":
31 from allmydata.windows import inotify
32 elif runtime.platform.supportsINotify():
33 from twisted.internet import inotify
35 raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
36 "This currently requires Linux or Windows.")
38 except (ImportError, AttributeError) as e:
40 if sys.platform == "win32":
41 raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
42 "Windows support requires at least Vista, and has only been tested on Windows 7.")
46 class MagicFolder(service.MultiService):
49 def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
50 pending_delay=1.0, clock=reactor):
51 precondition_abspath(local_path_u)
53 service.MultiService.__init__(self)
55 db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
57 return Failure(Exception('ERROR: Unable to load magic folder db.'))
65 self.uploader = Uploader(client, local_path_u, db, upload_dircap, pending_delay, clock)
66 self.downloader = Downloader(client, local_path_u, db, collective_dircap, clock)
68 def startService(self):
69 # TODO: why is this being called more than once?
71 return defer.succeed(None)
72 print "%r.startService" % (self,)
73 service.MultiService.startService(self)
74 return self.uploader.start_monitoring()
77 """ready is used to signal us to start
78 processing the upload and download items...
81 d = self.uploader.start_scanning()
82 d2 = self.downloader.start_scanning()
83 d.addCallback(lambda ign: d2)
88 d = self.uploader.stop()
89 d2 = self.downloader.stop()
90 d.addCallback(lambda ign: d2)
93 def remove_service(self):
94 return service.MultiService.disownServiceParent(self)
97 class QueueMixin(HookMixin):
98 def __init__(self, client, local_path_u, db, name, clock):
100 self._local_path_u = local_path_u
101 self._local_filepath = to_filepath(local_path_u)
105 self._hooks = {'processed': None, 'started': None}
106 self.started_d = self.set_hook('started')
108 if not self._local_filepath.exists():
109 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
110 "but there is no directory at that location."
111 % quote_local_unicode_path(self._local_path_u))
112 if not self._local_filepath.isdir():
113 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
114 "but the thing at that location is not a directory."
115 % quote_local_unicode_path(self._local_path_u))
117 self._deque = deque()
118 self._lazy_tail = defer.succeed(None)
119 self._pending = set()
120 self._stopped = False
123 def _get_filepath(self, relpath_u):
124 return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
126 def _get_relpath(self, filepath):
127 self._log("_get_relpath(%r)" % (filepath,))
128 segments = unicode_segments_from(filepath, self._local_filepath)
129 self._log("segments = %r" % (segments,))
130 return u"/".join(segments)
132 def _count(self, counter_name, delta=1):
133 ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
134 self._log("%s += %r" % (counter_name, delta))
135 self._client.stats_provider.count(ctr, delta)
137 def _logcb(self, res, msg):
138 self._log("%s: %r" % (msg, res))
142 s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
145 #open("events", "ab+").write(msg)
147 def _append_to_deque(self, relpath_u):
148 self._log("_append_to_deque(%r)" % (relpath_u,))
149 if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
151 self._deque.append(relpath_u)
152 self._pending.add(relpath_u)
153 self._count('objects_queued')
155 self._clock.callLater(0, self._turn_deque)
157 def _turn_deque(self):
158 self._log("_turn_deque")
163 item = self._deque.pop()
164 self._log("popped %r" % (item,))
165 self._count('objects_queued', -1)
167 self._log("deque is now empty")
168 self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
170 self._lazy_tail.addCallback(lambda ign: self._process(item))
171 self._lazy_tail.addBoth(self._call_hook, 'processed')
172 self._lazy_tail.addErrback(log.err)
173 self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
176 class Uploader(QueueMixin):
177 def __init__(self, client, local_path_u, db, upload_dircap, pending_delay, clock):
178 QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
180 self.is_ready = False
182 # TODO: allow a path rather than a cap URI.
183 self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
184 if not IDirectoryNode.providedBy(self._upload_dirnode):
185 raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
186 if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
187 raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
189 self._inotify = get_inotify_module()
190 self._notifier = self._inotify.INotify()
192 if hasattr(self._notifier, 'set_pending_delay'):
193 self._notifier.set_pending_delay(pending_delay)
195 # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
197 self.mask = ( self._inotify.IN_CREATE
198 | self._inotify.IN_CLOSE_WRITE
199 | self._inotify.IN_MOVED_TO
200 | self._inotify.IN_MOVED_FROM
201 | self._inotify.IN_DELETE
202 | self._inotify.IN_ONLYDIR
205 self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
208 def start_monitoring(self):
209 self._log("start_monitoring")
210 d = defer.succeed(None)
211 d.addCallback(lambda ign: self._notifier.startReading())
212 d.addCallback(lambda ign: self._count('dirs_monitored'))
213 d.addBoth(self._call_hook, 'started')
218 self._notifier.stopReading()
219 self._count('dirs_monitored', -1)
220 if hasattr(self._notifier, 'wait_until_stopped'):
221 d = self._notifier.wait_until_stopped()
223 d = defer.succeed(None)
224 d.addCallback(lambda ign: self._lazy_tail)
227 def start_scanning(self):
228 self._log("start_scanning")
230 self._pending = self._db.get_all_relpaths()
231 self._log("all_files %r" % (self._pending))
233 def _add_pending(ign):
234 # This adds all of the files that were in the db but not already processed
235 # (normally because they have been deleted on disk).
236 self._log("adding %r" % (self._pending))
237 self._deque.extend(self._pending)
238 d.addCallback(_add_pending)
239 d.addCallback(lambda ign: self._turn_deque())
242 def _scan(self, reldir_u):
243 self._log("scan %r" % (reldir_u,))
244 fp = self._get_filepath(reldir_u)
246 children = listdir_filepath(fp)
247 except EnvironmentError:
248 raise Exception("WARNING: magic folder: permission denied on directory %s"
249 % quote_filepath(fp))
250 except FilenameEncodingError:
251 raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
252 % quote_filepath(fp))
254 d = defer.succeed(None)
255 for child in children:
256 assert isinstance(child, unicode), child
257 d.addCallback(lambda ign, child=child:
258 ("%s/%s" % (reldir_u, child) if reldir_u else child))
259 def _add_pending(relpath_u):
260 if magicpath.should_ignore_file(relpath_u):
263 self._pending.add(relpath_u)
265 d.addCallback(_add_pending)
266 # This call to _process doesn't go through the deque, and probably should.
267 d.addCallback(self._process)
268 d.addBoth(self._call_hook, 'processed')
269 d.addErrback(log.err)
273 def _notify(self, opaque, path, events_mask):
274 self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
276 # We filter out IN_CREATE events not associated with a directory.
277 # Acting on IN_CREATE for files could cause us to read and upload
278 # a possibly-incomplete file before the application has closed it.
279 # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
280 # It isn't possible to avoid watching for IN_CREATE at all, because
281 # it is the only event notified for a directory creation.
283 if ((events_mask & self._inotify.IN_CREATE) != 0 and
284 (events_mask & self._inotify.IN_ISDIR) == 0):
285 self._log("ignoring inotify event for creation of file %r\n" % (path,))
288 relpath_u = self._get_relpath(path)
289 self._append_to_deque(relpath_u)
291 def _when_queue_is_empty(self):
292 return defer.succeed(None)
294 def _process(self, relpath_u):
295 self._log("_process(%r)" % (relpath_u,))
296 if relpath_u is None:
298 precondition(isinstance(relpath_u, unicode), relpath_u)
300 d = defer.succeed(None)
302 def _maybe_upload(val, now=None):
305 fp = self._get_filepath(relpath_u)
306 pathinfo = get_pathinfo(unicode_from_filepath(fp))
308 self._log("pending = %r, about to remove %r" % (self._pending, relpath_u))
309 self._pending.remove(relpath_u)
310 encoded_path_u = magicpath.path2magic(relpath_u)
312 if not pathinfo.exists:
313 # FIXME merge this with the 'isfile' case.
314 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
315 self._count('objects_disappeared')
316 if not self._db.check_file_db_exists(relpath_u):
319 last_downloaded_timestamp = now
320 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
322 current_version = self._db.get_local_file_version(relpath_u)
323 if current_version is None:
325 elif self._db.is_new_file(pathinfo, relpath_u):
326 new_version = current_version + 1
328 self._log("Not uploading %r" % (relpath_u,))
329 self._count('objects_not_uploaded')
332 metadata = { 'version': new_version,
334 'last_downloaded_timestamp': last_downloaded_timestamp }
335 if last_downloaded_uri is not None:
336 metadata['last_downloaded_uri'] = last_downloaded_uri
338 empty_uploadable = Data("", self._client.convergence)
339 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
340 metadata=metadata, overwrite=True)
342 def _add_db_entry(filenode):
343 filecap = filenode.get_uri()
344 self._db.did_upload_version(relpath_u, new_version, filecap,
345 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
346 self._count('files_uploaded')
347 d2.addCallback(_add_db_entry)
349 elif pathinfo.islink:
350 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
353 self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
354 uploadable = Data("", self._client.convergence)
355 encoded_path_u += magicpath.path2magic(u"/")
356 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
358 self._log("created subdirectory %r" % (relpath_u,))
359 self._count('directories_created')
361 self._log("failed to create subdirectory %r" % (relpath_u,))
363 upload_d.addCallbacks(_succeeded, _failed)
364 upload_d.addCallback(lambda ign: self._scan(relpath_u))
366 elif pathinfo.isfile:
367 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
368 last_downloaded_timestamp = now
370 current_version = self._db.get_local_file_version(relpath_u)
371 if current_version is None:
373 elif self._db.is_new_file(pathinfo, relpath_u):
374 new_version = current_version + 1
376 self._log("Not uploading %r" % (relpath_u,))
377 self._count('objects_not_uploaded')
380 metadata = { 'version': new_version,
381 'last_downloaded_timestamp': last_downloaded_timestamp }
382 if last_downloaded_uri is not None:
383 metadata['last_downloaded_uri'] = last_downloaded_uri
385 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
386 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
387 metadata=metadata, overwrite=True)
389 def _add_db_entry(filenode):
390 filecap = filenode.get_uri()
391 last_downloaded_uri = metadata.get('last_downloaded_uri', None)
392 self._db.did_upload_version(relpath_u, new_version, filecap,
393 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
394 self._count('files_uploaded')
395 d2.addCallback(_add_db_entry)
398 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
401 d.addCallback(_maybe_upload)
404 self._count('objects_succeeded')
407 self._count('objects_failed')
408 self._log("%r while processing %r" % (f, relpath_u))
410 d.addCallbacks(_succeeded, _failed)
413 def _get_metadata(self, encoded_path_u):
415 d = self._upload_dirnode.get_metadata_for(encoded_path_u)
420 def _get_filenode(self, encoded_path_u):
422 d = self._upload_dirnode.get(encoded_path_u)
428 class WriteFileMixin(object):
431 def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
432 self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
433 % (abspath_u, len(file_contents), is_conflict, now))
435 # 1. Write a temporary file, say .foo.tmp.
436 # 2. is_conflict determines whether this is an overwrite or a conflict.
437 # 3. Set the mtime of the replacement file to be T seconds before the
438 # current local time.
439 # 4. Perform a file replacement with backup filename foo.backup,
440 # replaced file foo, and replacement file .foo.tmp. If any step of
441 # this operation fails, reclassify as a conflict and stop.
443 # Returns the path of the destination file.
445 precondition_abspath(abspath_u)
446 replacement_path_u = abspath_u + u".tmp" # FIXME more unique
447 backup_path_u = abspath_u + u".backup"
451 # ensure parent directory exists
452 head, tail = os.path.split(abspath_u)
454 fileutil.make_dirs(head, mode)
456 fileutil.write(replacement_path_u, file_contents)
457 os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
459 return self._rename_conflicted_file(abspath_u, replacement_path_u)
462 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
464 except fileutil.ConflictError:
465 return self._rename_conflicted_file(abspath_u, replacement_path_u)
467 def _rename_conflicted_file(self, abspath_u, replacement_path_u):
468 self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
470 conflict_path_u = abspath_u + u".conflict"
471 fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
472 return conflict_path_u
475 class Downloader(QueueMixin, WriteFileMixin):
476 REMOTE_SCAN_INTERVAL = 3 # facilitates tests
478 def __init__(self, client, local_path_u, db, collective_dircap, clock):
479 QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
481 # TODO: allow a path rather than a cap URI.
482 self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
484 if not IDirectoryNode.providedBy(self._collective_dirnode):
485 raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
486 if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
487 raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
489 self._turn_delay = self.REMOTE_SCAN_INTERVAL
490 self._download_scan_batch = {} # path -> [(filenode, metadata)]
492 def start_scanning(self):
493 self._log("start_scanning")
494 files = self._db.get_all_relpaths()
495 self._log("all files %s" % files)
497 d = self._scan_remote_collective()
503 d = defer.succeed(None)
504 d.addCallback(lambda ign: self._lazy_tail)
507 def _should_download(self, relpath_u, remote_version):
509 _should_download returns a bool indicating whether or not a remote object should be downloaded.
510 We check the remote metadata version against our magic-folder db version number;
513 self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
514 if magicpath.should_ignore_file(relpath_u):
518 v = self._db.get_local_file_version(relpath_u)
519 self._log("v = %r" % (v,))
520 return (v is None or v < remote_version)
522 def _get_local_latest(self, relpath_u):
524 _get_local_latest takes a unicode path string checks to see if this file object
525 exists in our magic-folder db; if not then return None
526 else check for an entry in our magic-folder db and return the version number.
528 if not self._get_filepath(relpath_u).exists():
530 return self._db.get_local_file_version(relpath_u)
532 def _get_collective_latest_file(self, filename):
534 _get_collective_latest_file takes a file path pointing to a file managed by
535 magic-folder and returns a deferred that fires with the two tuple containing a
536 file node and metadata for the latest version of the file located in the
537 magic-folder collective directory.
539 collective_dirmap_d = self._collective_dirnode.list()
540 def scan_collective(result):
541 list_of_deferreds = []
542 for dir_name in result.keys():
543 # XXX make sure it's a directory
544 d = defer.succeed(None)
545 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
546 list_of_deferreds.append(d)
547 deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
549 collective_dirmap_d.addCallback(scan_collective)
550 def highest_version(deferredList):
554 for success, result in deferredList:
556 if result[1]['version'] > max_version:
557 node, metadata = result
558 max_version = result[1]['version']
559 return node, metadata
560 collective_dirmap_d.addCallback(highest_version)
561 return collective_dirmap_d
563 def _append_to_batch(self, name, file_node, metadata):
564 if self._download_scan_batch.has_key(name):
565 self._download_scan_batch[name] += [(file_node, metadata)]
567 self._download_scan_batch[name] = [(file_node, metadata)]
569 def _scan_remote(self, nickname, dirnode):
570 self._log("_scan_remote nickname %r" % (nickname,))
572 def scan_listing(listing_map):
573 for encoded_relpath_u in listing_map.keys():
574 relpath_u = magicpath.magic2path(encoded_relpath_u)
575 self._log("found %r" % (relpath_u,))
577 file_node, metadata = listing_map[encoded_relpath_u]
578 local_version = self._get_local_latest(relpath_u)
579 remote_version = metadata.get('version', None)
580 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
581 if local_version is None or remote_version is None or local_version < remote_version:
582 self._log("%r added to download queue" % (relpath_u,))
583 self._append_to_batch(relpath_u, file_node, metadata)
584 d.addCallback(scan_listing)
585 d.addBoth(self._logcb, "end of _scan_remote")
588 def _scan_remote_collective(self):
589 self._log("_scan_remote_collective")
590 self._download_scan_batch = {} # XXX
592 if self._collective_dirnode is None:
594 collective_dirmap_d = self._collective_dirnode.list()
596 others = [x for x in result.keys()]
597 return result, others
598 collective_dirmap_d.addCallback(do_list)
599 def scan_collective(result):
600 d = defer.succeed(None)
601 collective_dirmap, others_list = result
602 for dir_name in others_list:
603 d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
604 # XXX todo add errback
606 collective_dirmap_d.addCallback(scan_collective)
607 collective_dirmap_d.addCallback(self._filter_scan_batch)
608 collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
609 return collective_dirmap_d
611 def _add_batch_to_download_queue(self, result):
612 self._log("result = %r" % (result,))
613 self._log("deque = %r" % (self._deque,))
614 self._deque.extend(result)
615 self._log("deque after = %r" % (self._deque,))
616 self._count('objects_queued', len(result))
617 self._log("pending = %r" % (self._pending,))
618 self._pending.update(map(lambda x: x[0], result))
619 self._log("pending after = %r" % (self._pending,))
621 def _filter_scan_batch(self, result):
622 self._log("_filter_scan_batch")
623 extension = [] # consider whether this should be a dict
624 for relpath_u in self._download_scan_batch.keys():
625 if relpath_u in self._pending:
627 file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
628 if self._should_download(relpath_u, metadata['version']):
629 extension += [(relpath_u, file_node, metadata)]
631 self._log("Excluding %r" % (relpath_u,))
632 self._count('objects_excluded')
633 self._call_hook(None, 'processed')
636 def _when_queue_is_empty(self):
637 d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
638 d.addBoth(self._logcb, "after _scan_remote_collective")
639 d.addCallback(lambda ign: self._turn_deque())
642 def _process(self, item, now=None):
643 self._log("_process(%r)" % (item,))
646 (relpath_u, file_node, metadata) = item
647 fp = self._get_filepath(relpath_u)
648 abspath_u = unicode_from_filepath(fp)
650 d = defer.succeed(None)
651 if relpath_u.endswith(u"/"):
652 self._log("mkdir(%r)" % (abspath_u,))
653 d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
654 d.addCallback(lambda ign: abspath_u)
656 d.addCallback(lambda ign: file_node.download_best_version())
657 if metadata.get('deleted', False):
658 d.addCallback(lambda result: self._unlink_deleted_file(abspath_u, result))
660 d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents, is_conflict=False))
662 def do_update_db(written_abspath_u):
663 filecap = file_node.get_uri()
664 last_uploaded_uri = metadata.get('last_uploaded_uri', None)
665 last_downloaded_uri = filecap
666 last_downloaded_timestamp = now
667 written_pathinfo = get_pathinfo(written_abspath_u)
668 if not written_pathinfo.exists and not metadata.get('deleted', False):
669 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
671 self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
672 last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
673 self._count('objects_downloaded')
675 self._log("download failed: %s" % (str(f),))
676 self._count('objects_failed')
678 d.addCallbacks(do_update_db, failed)
679 def remove_from_pending(res):
680 self._pending.remove(relpath_u)
682 d.addBoth(remove_from_pending)
685 def _unlink_deleted_file(self, abspath_u, result):
687 self._log('unlinking: %s' % (abspath_u,))
688 shutil.move(abspath_u, abspath_u + '.backup')
690 self._log("Already gone: '%s'" % (abspath_u,))