4 from collections import deque
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19 extend_filepath, unicode_from_filepath, unicode_segments_from, \
20 quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import backupdb, magicpath
25 IN_EXCL_UNLINK = 0x04000000L
27 def get_inotify_module():
29 if sys.platform == "win32":
30 from allmydata.windows import inotify
31 elif runtime.platform.supportsINotify():
32 from twisted.internet import inotify
34 raise NotImplementedError("filesystem notification needed for drop-upload is not supported.\n"
35 "This currently requires Linux or Windows.")
37 except (ImportError, AttributeError) as e:
39 if sys.platform == "win32":
40 raise NotImplementedError("filesystem notification needed for drop-upload is not supported.\n"
41 "Windows support requires at least Vista, and has only been tested on Windows 7.")
45 class MagicFolder(service.MultiService):
48 def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49 pending_delay=1.0, clock=reactor):
50 precondition_abspath(local_path_u)
52 service.MultiService.__init__(self)
54 db = backupdb.get_backupdb(dbfile, create_version=(backupdb.SCHEMA_v3, 3))
56 return Failure(Exception('ERROR: Unable to load magic folder db.'))
64 self.uploader = Uploader(client, local_path_u, db, upload_dircap, pending_delay, clock)
65 self.downloader = Downloader(client, local_path_u, db, collective_dircap, clock)
67 def startService(self):
68 # TODO: why is this being called more than once?
70 return defer.succeed(None)
71 print "%r.startService" % (self,)
72 service.MultiService.startService(self)
73 return self.uploader.start_monitoring()
76 """ready is used to signal us to start
77 processing the upload and download items...
80 d = self.uploader.start_scanning()
81 d2 = self.downloader.start_scanning()
82 d.addCallback(lambda ign: d2)
87 d = self.uploader.stop()
88 d2 = self.downloader.stop()
89 d.addCallback(lambda ign: d2)
92 def remove_service(self):
93 return service.MultiService.disownServiceParent(self)
96 class QueueMixin(HookMixin):
97 def __init__(self, client, local_path_u, db, name, clock):
99 self._local_path_u = local_path_u
100 self._local_filepath = to_filepath(local_path_u)
104 self._hooks = {'processed': None, 'started': None}
105 self.started_d = self.set_hook('started')
107 if not self._local_filepath.exists():
108 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
109 "but there is no directory at that location."
110 % quote_local_unicode_path(self._local_path_u))
111 if not self._local_filepath.isdir():
112 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
113 "but the thing at that location is not a directory."
114 % quote_local_unicode_path(self._local_path_u))
116 self._deque = deque()
117 self._lazy_tail = defer.succeed(None)
118 self._pending = set()
119 self._stopped = False
122 def _get_filepath(self, relpath_u):
123 return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
125 def _get_relpath(self, filepath):
126 print "_get_relpath(%r)" % (filepath,)
127 segments = unicode_segments_from(filepath, self._local_filepath)
128 print "segments = %r" % (segments,)
129 return u"/".join(segments)
131 def _count(self, counter_name, delta=1):
132 ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
133 print "%r += %r" % (ctr, delta)
134 self._client.stats_provider.count(ctr, delta)
137 s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
140 #open("events", "ab+").write(msg)
142 def _append_to_deque(self, relpath_u):
143 print "_append_to_deque(%r)" % (relpath_u,)
144 if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
146 self._deque.append(relpath_u)
147 self._pending.add(relpath_u)
148 self._count('objects_queued')
150 self._clock.callLater(0, self._turn_deque)
152 def _turn_deque(self):
156 item = self._deque.pop()
157 self._count('objects_queued', -1)
159 self._log("deque is now empty")
160 self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
162 self._lazy_tail.addCallback(lambda ign: self._process(item))
163 self._lazy_tail.addBoth(self._call_hook, 'processed')
164 self._lazy_tail.addErrback(log.err)
165 self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
168 class Uploader(QueueMixin):
169 def __init__(self, client, local_path_u, db, upload_dircap, pending_delay, clock):
170 QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
172 self.is_ready = False
174 # TODO: allow a path rather than a cap URI.
175 self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
176 if not IDirectoryNode.providedBy(self._upload_dirnode):
177 raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
178 if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
179 raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
181 self._inotify = get_inotify_module()
182 self._notifier = self._inotify.INotify()
184 if hasattr(self._notifier, 'set_pending_delay'):
185 self._notifier.set_pending_delay(pending_delay)
187 # We don't watch for IN_CREATE, because that would cause us to read and upload a
188 # possibly-incomplete file before the application has closed it. There should always
189 # be an IN_CLOSE_WRITE after an IN_CREATE (I think).
190 # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
192 self.mask = ( self._inotify.IN_CLOSE_WRITE
193 | self._inotify.IN_MOVED_TO
194 | self._inotify.IN_MOVED_FROM
195 | self._inotify.IN_DELETE
196 | self._inotify.IN_ONLYDIR
199 self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
202 def start_monitoring(self):
203 self._log("start_monitoring")
204 d = defer.succeed(None)
205 d.addCallback(lambda ign: self._notifier.startReading())
206 d.addCallback(lambda ign: self._count('dirs_monitored'))
207 d.addBoth(self._call_hook, 'started')
212 self._notifier.stopReading()
213 self._count('dirs_monitored', -1)
214 if hasattr(self._notifier, 'wait_until_stopped'):
215 d = self._notifier.wait_until_stopped()
217 d = defer.succeed(None)
218 d.addCallback(lambda ign: self._lazy_tail)
221 def start_scanning(self):
222 self._log("start_scanning")
224 self._pending = self._db.get_all_relpaths()
225 print "all_files %r" % (self._pending)
227 def _add_pending(ign):
228 # This adds all of the files that were in the db but not already processed
229 # (normally because they have been deleted on disk).
230 print "adding %r" % (self._pending)
231 self._deque.extend(self._pending)
232 d.addCallback(_add_pending)
233 d.addCallback(lambda ign: self._turn_deque())
236 def _scan(self, reldir_u):
237 self._log("scan %r" % (reldir_u,))
238 fp = self._get_filepath(reldir_u)
240 children = listdir_filepath(fp)
241 except EnvironmentError:
242 raise Exception("WARNING: magic folder: permission denied on directory %s"
243 % quote_filepath(fp))
244 except FilenameEncodingError:
245 raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
246 % quote_filepath(fp))
248 d = defer.succeed(None)
249 for child in children:
250 assert isinstance(child, unicode), child
251 d.addCallback(lambda ign, child=child:
252 ("%s/%s" % (reldir_u, child) if reldir_u else child))
253 def _add_pending(relpath_u):
254 if magicpath.should_ignore_file(relpath_u):
257 self._pending.add(relpath_u)
259 d.addCallback(_add_pending)
260 # This call to _process doesn't go through the deque, and probably should.
261 d.addCallback(self._process)
262 d.addBoth(self._call_hook, 'processed')
263 d.addErrback(log.err)
267 def _notify(self, opaque, path, events_mask):
268 self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
269 relpath_u = self._get_relpath(path)
270 self._append_to_deque(relpath_u)
272 def _when_queue_is_empty(self):
273 return defer.succeed(None)
275 def _process(self, relpath_u):
276 self._log("_process(%r)" % (relpath_u,))
277 if relpath_u is None:
279 precondition(isinstance(relpath_u, unicode), relpath_u)
281 d = defer.succeed(None)
283 def _maybe_upload(val):
284 fp = self._get_filepath(relpath_u)
285 pathinfo = get_pathinfo(unicode_from_filepath(fp))
287 print "pending = %r, about to remove %r" % (self._pending, relpath_u)
288 self._pending.remove(relpath_u)
289 encoded_path_u = magicpath.path2magic(relpath_u)
291 if not pathinfo.exists:
292 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
293 self._count('objects_disappeared')
294 d2 = defer.succeed(None)
295 if self._db.check_file_db_exists(relpath_u):
296 d2.addCallback(lambda ign: self._get_metadata(encoded_path_u))
297 current_version = self._db.get_local_file_version(relpath_u) + 1
298 def set_deleted(metadata):
299 metadata['version'] = current_version
300 metadata['deleted'] = True
301 empty_uploadable = Data("", self._client.convergence)
302 return self._upload_dirnode.add_file(encoded_path_u, empty_uploadable, overwrite=True, metadata=metadata)
303 d2.addCallback(set_deleted)
304 def add_db_entry(filenode):
305 filecap = filenode.get_uri()
306 self._db.did_upload_version(filecap, relpath_u, current_version, pathinfo)
307 self._count('files_uploaded')
308 # FIXME consider whether it's correct to retrieve the filenode again.
309 d2.addCallback(lambda x: self._get_filenode(encoded_path_u))
310 d2.addCallback(add_db_entry)
312 d2.addCallback(lambda x: Exception("file does not exist")) # FIXME wrong
314 elif pathinfo.islink:
315 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
318 self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
319 uploadable = Data("", self._client.convergence)
320 encoded_path_u += magicpath.path2magic(u"/")
321 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
323 self._log("created subdirectory %r" % (relpath_u,))
324 self._count('directories_created')
326 self._log("failed to create subdirectory %r" % (relpath_u,))
328 upload_d.addCallbacks(_succeeded, _failed)
329 upload_d.addCallback(lambda ign: self._scan(relpath_u))
331 elif pathinfo.isfile:
332 version = self._db.get_local_file_version(relpath_u)
335 elif self._db.is_new_file(pathinfo, relpath_u):
340 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
341 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":version}, overwrite=True)
342 def add_db_entry(filenode):
343 filecap = filenode.get_uri()
344 self._db.did_upload_version(filecap, relpath_u, version, pathinfo)
345 self._count('files_uploaded')
346 d2.addCallback(add_db_entry)
349 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
352 d.addCallback(_maybe_upload)
355 self._count('objects_succeeded')
359 self._count('objects_failed')
360 self._log("%r while processing %r" % (f, relpath_u))
362 d.addCallbacks(_succeeded, _failed)
365 def _get_metadata(self, encoded_path_u):
367 d = self._upload_dirnode.get_metadata_for(encoded_path_u)
372 def _get_filenode(self, encoded_path_u):
374 d = self._upload_dirnode.get(encoded_path_u)
380 class Downloader(QueueMixin):
381 def __init__(self, client, local_path_u, db, collective_dircap, clock):
382 QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
384 # TODO: allow a path rather than a cap URI.
385 self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
387 if not IDirectoryNode.providedBy(self._collective_dirnode):
388 raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
389 if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
390 raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
392 self._turn_delay = 3 # delay between remote scans
393 self._download_scan_batch = {} # path -> [(filenode, metadata)]
395 def start_scanning(self):
396 self._log("\nstart_scanning")
397 files = self._db.get_all_relpaths()
398 self._log("all files %s" % files)
400 d = self._scan_remote_collective()
406 d = defer.succeed(None)
407 d.addCallback(lambda ign: self._lazy_tail)
410 def _should_download(self, relpath_u, remote_version):
412 _should_download returns a bool indicating whether or not a remote object should be downloaded.
413 We check the remote metadata version against our magic-folder db version number;
416 if magicpath.should_ignore_file(relpath_u):
418 v = self._db.get_local_file_version(relpath_u)
419 return (v is None or v < remote_version)
421 def _get_local_latest(self, relpath_u):
423 _get_local_latest takes a unicode path string checks to see if this file object
424 exists in our magic-folder db; if not then return None
425 else check for an entry in our magic-folder db and return the version number.
427 if not self._get_filepath(relpath_u).exists():
429 return self._db.get_local_file_version(relpath_u)
431 def _get_collective_latest_file(self, filename):
433 _get_collective_latest_file takes a file path pointing to a file managed by
434 magic-folder and returns a deferred that fires with the two tuple containing a
435 file node and metadata for the latest version of the file located in the
436 magic-folder collective directory.
438 collective_dirmap_d = self._collective_dirnode.list()
439 def scan_collective(result):
440 list_of_deferreds = []
441 for dir_name in result.keys():
442 # XXX make sure it's a directory
443 d = defer.succeed(None)
444 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
445 list_of_deferreds.append(d)
446 deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
448 collective_dirmap_d.addCallback(scan_collective)
449 def highest_version(deferredList):
453 for success, result in deferredList:
455 if result[1]['version'] > max_version:
456 node, metadata = result
457 max_version = result[1]['version']
458 return node, metadata
459 collective_dirmap_d.addCallback(highest_version)
460 return collective_dirmap_d
462 def _append_to_batch(self, name, file_node, metadata):
463 if self._download_scan_batch.has_key(name):
464 self._download_scan_batch[name] += [(file_node, metadata)]
466 self._download_scan_batch[name] = [(file_node, metadata)]
468 def _scan_remote(self, nickname, dirnode):
469 self._log("_scan_remote nickname %r" % (nickname,))
471 def scan_listing(listing_map):
472 for name in listing_map.keys():
473 file_node, metadata = listing_map[name]
474 local_version = self._get_local_latest(name)
475 remote_version = metadata.get('version', None)
476 self._log("%r has local version %r, remote version %r" % (name, local_version, remote_version))
477 if local_version is None or remote_version is None or local_version < remote_version:
478 self._log("added to download queue\n")
479 self._append_to_batch(name, file_node, metadata)
480 d.addCallback(scan_listing)
483 def _scan_remote_collective(self):
484 self._log("_scan_remote_collective")
485 self._download_scan_batch = {} # XXX
487 if self._collective_dirnode is None:
489 collective_dirmap_d = self._collective_dirnode.list()
491 others = [x for x in result.keys()]
492 return result, others
493 collective_dirmap_d.addCallback(do_list)
494 def scan_collective(result):
495 d = defer.succeed(None)
496 collective_dirmap, others_list = result
497 for dir_name in others_list:
498 d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
499 # XXX todo add errback
501 collective_dirmap_d.addCallback(scan_collective)
502 collective_dirmap_d.addCallback(self._filter_scan_batch)
503 collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
504 return collective_dirmap_d
506 def _add_batch_to_download_queue(self, result):
507 print "result = %r" % (result,)
508 print "deque = %r" % (self._deque,)
509 self._deque.extend(result)
510 print "deque after = %r" % (self._deque,)
511 self._count('objects_queued', len(result))
512 print "pending = %r" % (self._pending,)
513 self._pending.update(map(lambda x: x[0], result))
514 print "pending after = %r" % (self._pending,)
516 def _filter_scan_batch(self, result):
517 extension = [] # consider whether this should be a dict
518 for relpath_u in self._download_scan_batch.keys():
519 if relpath_u in self._pending:
521 file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
522 if self._should_download(relpath_u, metadata['version']):
523 extension += [(relpath_u, file_node, metadata)]
526 def _when_queue_is_empty(self):
527 d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
528 d.addCallback(lambda ign: self._turn_deque())
531 def _process(self, item):
532 (relpath_u, file_node, metadata) = item
533 d = file_node.download_best_version()
535 fp = self._get_filepath(relpath_u)
536 abspath_u = unicode_from_filepath(fp)
537 d2 = defer.succeed(res)
538 d2.addCallback(lambda result: self._write_downloaded_file(abspath_u, result, is_conflict=False))
539 def do_update_db(written_abspath_u):
540 filecap = file_node.get_uri()
541 written_pathinfo = get_pathinfo(written_abspath_u)
542 if not written_pathinfo.exists:
543 raise Exception("downloaded file %s disappeared" % quote_local_unicode_path(written_abspath_u))
545 self._db.did_upload_version(filecap, relpath_u, metadata['version'], written_pathinfo)
546 d2.addCallback(do_update_db)
547 # XXX handle failure here with addErrback...
548 self._count('objects_downloaded')
551 self._log("download failed: %s" % (str(f),))
552 self._count('objects_download_failed')
554 d.addCallbacks(succeeded, failed)
555 def remove_from_pending(res):
556 self._pending.remove(relpath_u)
558 d.addBoth(remove_from_pending)
564 def _write_downloaded_file(cls, abspath_u, file_contents, is_conflict=False, now=None):
565 # 1. Write a temporary file, say .foo.tmp.
566 # 2. is_conflict determines whether this is an overwrite or a conflict.
567 # 3. Set the mtime of the replacement file to be T seconds before the
568 # current local time.
569 # 4. Perform a file replacement with backup filename foo.backup,
570 # replaced file foo, and replacement file .foo.tmp. If any step of
571 # this operation fails, reclassify as a conflict and stop.
573 # Returns the path of the destination file.
575 precondition_abspath(abspath_u)
576 replacement_path_u = abspath_u + u".tmp" # FIXME more unique
577 backup_path_u = abspath_u + u".backup"
581 fileutil.write(replacement_path_u, file_contents)
582 os.utime(replacement_path_u, (now, now - cls.FUDGE_SECONDS))
584 return cls._rename_conflicted_file(abspath_u, replacement_path_u)
587 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
589 except fileutil.ConflictError:
590 return cls._rename_conflicted_file(abspath_u, replacement_path_u)
593 def _rename_conflicted_file(self, abspath_u, replacement_path_u):
594 conflict_path_u = abspath_u + u".conflict"
595 fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
596 return conflict_path_u