4 from collections import deque
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_unicode, to_filepath, \
19 unicode_from_filepath, quote_local_unicode_path, FilenameEncodingError
20 from allmydata.immutable.upload import FileName, Data
21 from allmydata import backupdb, magicpath
24 IN_EXCL_UNLINK = 0x04000000L
26 def get_inotify_module():
28 if sys.platform == "win32":
29 from allmydata.windows import inotify
30 elif runtime.platform.supportsINotify():
31 from twisted.internet import inotify
33 raise NotImplementedError("filesystem notification needed for drop-upload is not supported.\n"
34 "This currently requires Linux or Windows.")
36 except (ImportError, AttributeError) as e:
38 if sys.platform == "win32":
39 raise NotImplementedError("filesystem notification needed for drop-upload is not supported.\n"
40 "Windows support requires at least Vista, and has only been tested on Windows 7.")
44 class MagicFolder(service.MultiService):
47 def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, inotify=None,
49 precondition_abspath(local_path_u)
51 service.MultiService.__init__(self)
53 db = backupdb.get_backupdb(dbfile, create_version=(backupdb.SCHEMA_v3, 3))
55 return Failure(Exception('ERROR: Unable to load magic folder db.'))
63 self.uploader = Uploader(client, local_path_u, db, upload_dircap, inotify, pending_delay)
64 self.downloader = Downloader(client, local_path_u, db, collective_dircap)
66 def startService(self):
67 service.MultiService.startService(self)
68 return self.uploader.start_monitoring()
71 """ready is used to signal us to start
72 processing the upload and download items...
75 d = self.uploader.start_scanning()
76 d2 = self.downloader.start_scanning()
77 d.addCallback(lambda ign: d2)
82 d = self.uploader.stop()
83 d2 = self.downloader.stop()
84 d.addCallback(lambda ign: d2)
87 def remove_service(self):
88 return service.MultiService.disownServiceParent(self)
91 class QueueMixin(HookMixin):
92 def __init__(self, client, local_path_u, db, name):
94 self._local_path_u = local_path_u
95 self._local_path = to_filepath(local_path_u)
98 self._hooks = {'processed': None}
100 if not self._local_path.exists():
101 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
102 "but there is no directory at that location."
103 % quote_local_unicode_path(self._local_path_u))
104 if not self._local_path.isdir():
105 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
106 "but the thing at that location is not a directory."
107 % quote_local_unicode_path(self._local_path_u))
109 self._deque = deque()
110 self._lazy_tail = defer.succeed(None)
111 self._pending = set()
112 self._stopped = False
115 def _count(self, counter_name, delta=1):
116 self._client.stats_provider.count('magic_folder.%s.%s' % (self._name, counter_name), delta)
119 s = "Magic Folder %s: %s" % (self._name, msg)
122 #open("events", "ab+").write(msg)
124 def _append_to_deque(self, path):
125 if path in self._pending:
127 self._deque.append(path)
128 self._pending.add(path)
129 self._count('objects_queued')
131 reactor.callLater(0, self._turn_deque)
133 def _turn_deque(self):
137 item = self._deque.pop()
139 self._log("deque is now empty")
140 self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
142 self._lazy_tail.addCallback(lambda ign: self._process(item))
143 self._lazy_tail.addBoth(self._call_hook, 'processed')
144 self._lazy_tail.addErrback(log.err)
145 self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._turn_delay, self._turn_deque))
148 class Uploader(QueueMixin):
149 def __init__(self, client, local_path_u, db, upload_dircap, inotify, pending_delay):
150 QueueMixin.__init__(self, client, local_path_u, db, 'uploader')
152 self.is_ready = False
154 # TODO: allow a path rather than a cap URI.
155 self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
156 if not IDirectoryNode.providedBy(self._upload_dirnode):
157 raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
158 if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
159 raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
161 self._inotify = inotify or get_inotify_module()
162 self._notifier = self._inotify.INotify()
164 if hasattr(self._notifier, 'set_pending_delay'):
165 self._notifier.set_pending_delay(pending_delay)
167 # We don't watch for IN_CREATE, because that would cause us to read and upload a
168 # possibly-incomplete file before the application has closed it. There should always
169 # be an IN_CLOSE_WRITE after an IN_CREATE (I think).
170 # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
172 self.mask = ( self._inotify.IN_CLOSE_WRITE
173 | self._inotify.IN_MOVED_TO
174 | self._inotify.IN_MOVED_FROM
175 | self._inotify.IN_DELETE
176 | self._inotify.IN_ONLYDIR
179 self._notifier.watch(self._local_path, mask=self.mask, callbacks=[self._notify],
182 def start_monitoring(self):
183 d = self._notifier.startReading()
184 self._count('dirs_monitored')
188 self._notifier.stopReading()
189 self._count('dirs_monitored', -1)
190 if hasattr(self._notifier, 'wait_until_stopped'):
191 d = self._notifier.wait_until_stopped()
193 d = defer.succeed(None)
194 d.addCallback(lambda ign: self._lazy_tail)
197 def start_scanning(self):
199 d = self._scan(self._local_path_u) # XXX do not want dropped deferreds!
203 def _scan(self, local_path_u): # XXX should this take a FilePath?
204 if not os.path.isdir(local_path_u):
205 raise AssertionError("Programmer error: _scan() must be passed a directory path.")
206 quoted_path = quote_local_unicode_path(local_path_u)
208 children = listdir_unicode(local_path_u)
209 except EnvironmentError:
210 raise(Exception("WARNING: magic folder: permission denied on directory %s" % (quoted_path,)))
211 except FilenameEncodingError:
212 raise(Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error" % (quoted_path,)))
214 d = defer.succeed(None)
215 for child in children:
216 assert isinstance(child, unicode), child
217 d.addCallback(lambda ign, child=child: os.path.join(local_path_u, child))
218 d.addCallback(self._process_child)
219 d.addErrback(log.err)
223 def _notify(self, opaque, path, events_mask):
224 self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
225 path_u = unicode_from_filepath(path)
226 self._append_to_deque(path_u)
228 def _when_queue_is_empty(self):
229 return defer.succeed(None)
231 def _process_child(self, path_u):
232 precondition(isinstance(path_u, unicode), path_u)
234 pathinfo = get_pathinfo(path_u)
237 self.warn("WARNING: cannot backup symlink %s" % quote_local_unicode_path(path_u))
240 # process directories unconditionally
241 self._append_to_deque(path_u)
243 # recurse on the child directory
244 return self._scan(path_u)
245 elif pathinfo.isfile:
246 file_version = self._db.get_local_file_version(path_u)
247 if file_version is None:
248 # XXX upload if we didn't record our version in magicfolder db?
249 self._append_to_deque(path_u)
252 d2 = self._get_collective_latest_file(path_u)
253 def _got_latest_file((file_node, metadata)):
254 collective_version = metadata['version']
255 if collective_version is None:
257 if file_version > collective_version:
258 self._append_to_upload_deque(path_u)
259 elif file_version < collective_version: # FIXME Daira thinks this is wrong
260 # if a collective version of the file is newer than ours
261 # we must download it and unlink the old file from our upload dirnode
262 self._append_to_download_deque(path_u)
263 # XXX where should we save the returned deferred?
264 return self._upload_dirnode.delete(path_u, must_be_file=True)
266 # XXX same version. do nothing.
268 d2.addCallback(_got_latest_file)
271 self.warn("WARNING: cannot backup special file %s" % quote_local_unicode_path(path_u))
274 def _process(self, path_u):
275 precondition(isinstance(path_u, unicode), path_u)
277 d = defer.succeed(None)
279 def _maybe_upload(val):
280 pathinfo = get_pathinfo(path_u)
282 self._pending.remove(path_u) # FIXME make _upload_pending hold relative paths
283 relpath_u = os.path.relpath(path_u, self._local_path_u)
284 encoded_name_u = magicpath.path2magic(relpath_u)
286 if not pathinfo.exists:
287 self._log("drop-upload: notified object %r disappeared "
288 "(this is normal for temporary objects)" % (path_u,))
289 self._count('objects_disappeared')
290 d2 = defer.succeed(None)
291 if self._db.check_file_db_exists(relpath_u):
292 d2.addCallback(lambda ign: self._get_metadata(encoded_name_u))
293 current_version = self._db.get_local_file_version(relpath_u) + 1
294 def set_deleted(metadata):
295 metadata['version'] = current_version
296 metadata['deleted'] = True
297 empty_uploadable = Data("", self._client.convergence)
298 return self._upload_dirnode.add_file(encoded_name_u, empty_uploadable, overwrite=True, metadata=metadata)
299 d2.addCallback(set_deleted)
300 def add_db_entry(filenode):
301 filecap = filenode.get_uri()
306 self._db.did_upload_file(filecap, relpath_u, current_version, int(mtime), int(ctime), size)
307 self._count('files_uploaded')
308 d2.addCallback(lambda x: self._get_filenode(encoded_name_u))
309 d2.addCallback(add_db_entry)
311 d2.addCallback(lambda x: Exception("file does not exist")) # FIXME wrong
313 elif pathinfo.islink:
314 self.warn("WARNING: cannot upload symlink %s" % quote_local_unicode_path(path_u))
317 self._notifier.watch(to_filepath(path_u), mask=self.mask, callbacks=[self._notify], recursive=True)
318 uploadable = Data("", self._client.convergence)
319 encoded_name_u += u"@_"
320 upload_d = self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":0}, overwrite=True)
322 self._log("created subdirectory %r" % (path_u,))
323 self._count('directories_created')
325 self._log("failed to create subdirectory %r" % (path_u,))
327 upload_d.addCallbacks(_succeeded, _failed)
328 upload_d.addCallback(lambda ign: self._scan(path_u))
330 elif pathinfo.isfile:
331 version = self._db.get_local_file_version(relpath_u)
337 uploadable = FileName(path_u, self._client.convergence)
338 d2 = self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":version}, overwrite=True)
339 def add_db_entry(filenode):
340 filecap = filenode.get_uri()
341 # XXX maybe just pass pathinfo
342 self._db.did_upload_file(filecap, relpath_u, version,
343 pathinfo.mtime, pathinfo.ctime, pathinfo.size)
344 self._count('files_uploaded')
345 d2.addCallback(add_db_entry)
348 self.warn("WARNING: cannot process special file %s" % quote_local_unicode_path(path_u))
351 d.addCallback(_maybe_upload)
354 self._count('objects_queued', -1)
355 self._count('objects_succeeded')
358 self._count('objects_queued', -1)
359 self._count('objects_failed')
360 self._log("%r while processing %r" % (f, path_u))
362 d.addCallbacks(_succeeded, _failed)
365 def _get_metadata(self, encoded_name_u):
367 d = self._upload_dirnode.get_metadata_for(encoded_name_u)
372 def _get_filenode(self, encoded_name_u):
374 d = self._upload_dirnode.get(encoded_name_u)
380 class Downloader(QueueMixin):
381 def __init__(self, client, local_path_u, db, collective_dircap):
382 QueueMixin.__init__(self, client, local_path_u, db, 'downloader')
384 # TODO: allow a path rather than a cap URI.
385 self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
387 if not IDirectoryNode.providedBy(self._collective_dirnode):
388 raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
389 if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
390 raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
392 self._turn_delay = 3 # delay between remote scans
393 self._download_scan_batch = {} # path -> [(filenode, metadata)]
395 def start_scanning(self):
396 self._log("\nstart_scanning")
397 files = self._db.get_all_files()
398 self._log("all files %s" % files)
400 d = self._scan_remote_collective()
406 d = defer.succeed(None)
407 d.addCallback(lambda ign: self._lazy_tail)
410 def _should_download(self, relpath_u, remote_version):
412 _should_download returns a bool indicating whether or not a remote object should be downloaded.
413 We check the remote metadata version against our magic-folder db version number;
416 v = self._db.get_local_file_version(relpath_u)
417 return (v is None or v < remote_version)
419 def _get_local_latest(self, path_u):
420 """_get_local_latest takes a unicode path string checks to see if this file object
421 exists in our magic-folder db; if not then return None
422 else check for an entry in our magic-folder db and return the version number.
424 if not os.path.exists(path_u):
426 return self._db.get_local_file_version(path_u)
428 def _get_collective_latest_file(self, filename):
429 """_get_collective_latest_file takes a file path pointing to a file managed by
430 magic-folder and returns a deferred that fires with the two tuple containing a
431 file node and metadata for the latest version of the file located in the
432 magic-folder collective directory.
434 collective_dirmap_d = self._collective_dirnode.list()
435 def scan_collective(result):
436 list_of_deferreds = []
437 for dir_name in result.keys():
438 # XXX make sure it's a directory
439 d = defer.succeed(None)
440 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
441 list_of_deferreds.append(d)
442 deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
444 collective_dirmap_d.addCallback(scan_collective)
445 def highest_version(deferredList):
449 for success, result in deferredList:
451 if result[1]['version'] > max_version:
452 node, metadata = result
453 max_version = result[1]['version']
454 return node, metadata
455 collective_dirmap_d.addCallback(highest_version)
456 return collective_dirmap_d
458 def _append_to_batch(self, name, file_node, metadata):
459 if self._download_scan_batch.has_key(name):
460 self._download_scan_batch[name] += [(file_node, metadata)]
462 self._download_scan_batch[name] = [(file_node, metadata)]
464 def _scan_remote(self, nickname, dirnode):
465 self._log("_scan_remote nickname %s" % nickname)
467 def scan_listing(listing_map):
468 for name in listing_map.keys():
469 file_node, metadata = listing_map[name]
470 local_version = self._get_local_latest(name)
471 remote_version = metadata.get('version', None)
472 self._log("%r has local version %r, remote version %r" % (name, local_version, remote_version))
473 if local_version is None or remote_version is None or local_version < remote_version:
474 self._log("added to download queue\n")
475 self._append_to_batch(name, file_node, metadata)
476 d.addCallback(scan_listing)
479 def _scan_remote_collective(self):
480 self._log("_scan_remote_collective")
481 self._download_scan_batch = {} # XXX
483 if self._collective_dirnode is None:
485 collective_dirmap_d = self._collective_dirnode.list()
487 others = [x for x in result.keys()]
488 return result, others
489 collective_dirmap_d.addCallback(do_list)
490 def scan_collective(result):
491 d = defer.succeed(None)
492 collective_dirmap, others_list = result
493 for dir_name in others_list:
494 d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
495 # XXX todo add errback
497 collective_dirmap_d.addCallback(scan_collective)
498 collective_dirmap_d.addCallback(self._filter_scan_batch)
499 collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
500 return collective_dirmap_d
502 def _add_batch_to_download_queue(self, result):
503 self._deque.extend(result)
504 self._pending.update(map(lambda x: x[0], result))
506 def _filter_scan_batch(self, result):
507 extension = [] # consider whether this should be a dict
508 for name in self._download_scan_batch.keys():
509 if name in self._pending:
511 file_node, metadata = max(self._download_scan_batch[name], key=lambda x: x[1]['version'])
512 if self._should_download(name, metadata['version']):
513 extension += [(name, file_node, metadata)]
516 def _when_queue_is_empty(self):
517 d = task.deferLater(reactor, self._turn_delay, self._scan_remote_collective)
518 d.addCallback(lambda ign: self._turn_deque())
521 def _process(self, item):
522 (name, file_node, metadata) = item
523 d = file_node.download_best_version()
525 d2 = defer.succeed(res)
526 absname = abspath_expanduser_unicode(name, base=self._local_path_u)
527 d2.addCallback(lambda result: self._write_downloaded_file(absname, result, is_conflict=False))
528 def do_update_db(full_path):
529 filecap = file_node.get_uri()
531 s = os.stat(full_path)
533 raise(Exception("wtf downloaded file %s disappeared" % full_path))
534 size = s[stat.ST_SIZE]
535 ctime = s[stat.ST_CTIME]
536 mtime = s[stat.ST_MTIME]
537 self._db.did_upload_file(filecap, name, metadata['version'], mtime, ctime, size)
538 d2.addCallback(do_update_db)
539 # XXX handle failure here with addErrback...
540 self._count('objects_downloaded')
543 self._log("download failed: %s" % (str(f),))
544 self._count('objects_download_failed')
546 d.addCallbacks(succeeded, failed)
547 def remove_from_pending(res):
548 self._pending.remove(name)
550 d.addBoth(remove_from_pending)
556 def _write_downloaded_file(cls, path, file_contents, base, is_conflict=False, now=None):
557 # 1. Write a temporary file, say .foo.tmp.
558 # 2. is_conflict determines whether this is an overwrite or a conflict.
559 # 3. Set the mtime of the replacement file to be T seconds before the
560 # current local time.
561 # 4. Perform a file replacement with backup filename foo.backup,
562 # replaced file foo, and replacement file .foo.tmp. If any step of
563 # this operation fails, reclassify as a conflict and stop.
565 # Returns the path of the destination file.
567 precondition(isinstance(path, unicode), path=path)
568 path = fileutil.abspath_expanduser_unicode(path, base=base)
569 replacement_path = path + u".tmp" # FIXME more unique
570 backup_path = path + u".backup"
574 fileutil.write(replacement_path, file_contents)
575 os.utime(replacement_path, (now, now - cls.FUDGE_SECONDS))
577 return cls._rename_conflicted_file(path, replacement_path)
580 fileutil.replace_file(path, replacement_path, backup_path)
582 except fileutil.ConflictError:
583 return cls._rename_conflicted_file(path, replacement_path)
586 def _rename_conflicted_file(self, path, replacement_path):
587 conflict_path = path + u".conflict"
588 fileutil.rename_no_overwrite(replacement_path, conflict_path)