4 from collections import deque
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
12 from zope.interface import Interface, Attribute, implementer
14 from allmydata.util import fileutil
15 from allmydata.interfaces import IDirectoryNode
16 from allmydata.util import log
17 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
18 from allmydata.util.assertutil import precondition, _assert
19 from allmydata.util.deferredutil import HookMixin
20 from allmydata.util.progress import PercentProgress
21 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
22 extend_filepath, unicode_from_filepath, unicode_segments_from, \
23 quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
24 from allmydata.immutable.upload import FileName, Data
25 from allmydata import magicfolderdb, magicpath
27 defer.setDebugging(True)
28 IN_EXCL_UNLINK = 0x04000000L
30 def get_inotify_module():
32 if sys.platform == "win32":
33 from allmydata.windows import inotify
34 elif runtime.platform.supportsINotify():
35 from twisted.internet import inotify
37 raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
38 "This currently requires Linux or Windows.")
40 except (ImportError, AttributeError) as e:
42 if sys.platform == "win32":
43 raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
44 "Windows support requires at least Vista, and has only been tested on Windows 7.")
48 def is_new_file(pathinfo, db_entry):
52 if not pathinfo.exists and db_entry.size is None:
55 return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
56 (db_entry.size, db_entry.ctime, db_entry.mtime))
59 class MagicFolder(service.MultiService):
62 def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
63 pending_delay=1.0, clock=None):
64 precondition_abspath(local_path_u)
66 service.MultiService.__init__(self)
68 immediate = clock is not None
69 clock = clock or reactor
70 db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
72 return Failure(Exception('ERROR: Unable to load magic folder db.'))
78 upload_dirnode = self._client.create_node_from_uri(upload_dircap)
79 collective_dirnode = self._client.create_node_from_uri(collective_dircap)
81 self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
82 self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
83 upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
85 def startService(self):
86 # TODO: why is this being called more than once?
88 return defer.succeed(None)
89 print "%r.startService" % (self,)
90 service.MultiService.startService(self)
91 return self.uploader.start_monitoring()
94 """ready is used to signal us to start
95 processing the upload and download items...
97 self.uploader.start_uploading() # synchronous
98 return self.downloader.start_downloading()
102 d = self.uploader.stop()
103 d2 = self.downloader.stop()
104 d.addCallback(lambda ign: d2)
107 def remove_service(self):
108 return service.MultiService.disownServiceParent(self)
111 class QueueMixin(HookMixin):
112 def __init__(self, client, local_path_u, db, name, clock):
113 self._client = client
114 self._local_path_u = local_path_u
115 self._local_filepath = to_filepath(local_path_u)
119 self._hooks = {'processed': None, 'started': None}
120 self.started_d = self.set_hook('started')
122 if not self._local_filepath.exists():
123 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
124 "but there is no directory at that location."
125 % quote_local_unicode_path(self._local_path_u))
126 if not self._local_filepath.isdir():
127 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
128 "but the thing at that location is not a directory."
129 % quote_local_unicode_path(self._local_path_u))
131 self._deque = deque()
132 # do we also want to bound on "maximum age"?
133 self._process_history = deque(maxlen=20)
134 self._lazy_tail = defer.succeed(None)
135 self._stopped = False
138 def get_status(self):
140 Returns an iterable of instances that implement IQueuedItem
142 for item in self._deque:
144 for item in self._process_history:
147 def _get_filepath(self, relpath_u):
148 self._log("_get_filepath(%r)" % (relpath_u,))
149 return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
151 def _get_relpath(self, filepath):
152 self._log("_get_relpath(%r)" % (filepath,))
153 segments = unicode_segments_from(filepath, self._local_filepath)
154 self._log("segments = %r" % (segments,))
155 return u"/".join(segments)
157 def _count(self, counter_name, delta=1):
158 ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
159 self._log("%s += %r" % (counter_name, delta))
160 self._client.stats_provider.count(ctr, delta)
162 def _logcb(self, res, msg):
163 self._log("%s: %r" % (msg, res))
167 s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
170 #open("events", "ab+").write(msg)
172 def _turn_deque(self):
174 self._log("_turn_deque")
179 item = IQueuedItem(self._deque.pop())
180 self._process_history.append(item)
182 self._log("popped %r, now have %d" % (item, len(self._deque)))
183 self._count('objects_queued', -1)
185 self._log("deque is now empty")
186 self._lazy_tail.addBoth(self._logcb, "whawhat empty")
187 self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
188 self._lazy_tail.addBoth(self._logcb, "got past _when_queue_is_empty")
190 self._log("_turn_deque else clause")
191 self._lazy_tail.addBoth(self._logcb, "whawhat else %r" % (item,))
192 self._lazy_tail.addCallback(lambda ign: self._process(item))
193 self._lazy_tail.addBoth(self._logcb, "got past _process")
194 self._lazy_tail.addBoth(self._call_hook, 'processed', async=True)
195 self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,))
196 self._lazy_tail.addErrback(log.err)
197 self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
198 self._lazy_tail.addBoth(self._logcb, "got past deferLater")
199 except Exception as e:
200 self._log("---- turn deque exception %s" % (e,))
204 # this isn't in interfaces.py because it's very specific to QueueMixin
205 class IQueuedItem(Interface):
206 relpath_u = Attribute("The path this item represents")
207 progress = Attribute("A PercentProgress instance")
209 def set_status(self, status, current_time=None):
213 def status_time(self, state):
215 Get the time of particular state change, or None
218 def status_history(self):
220 All status changes, sorted latest -> oldest
224 @implementer(IQueuedItem)
225 class QueuedItem(object):
226 def __init__(self, relpath_u, progress):
227 self.relpath_u = relpath_u
228 self.progress = progress
229 self._status_history = dict()
231 def set_status(self, status, current_time=None):
232 if current_time is None:
233 current_time = time.time()
234 self._status_history[status] = current_time
236 def status_time(self, state):
238 Returns None if there's no status-update for 'state', else returns
239 the timestamp when that state was reached.
241 return self._status_history.get(state, None)
243 def status_history(self):
245 Returns a list of 2-tuples of (state, timestamp) sorted by timestamp
247 hist = self._status_history.items()
248 hist.sort(lambda a, b: cmp(a[1], b[1]))
252 class UploadItem(QueuedItem):
256 class Uploader(QueueMixin):
257 def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
259 QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
261 self.is_ready = False
262 self._immediate = immediate
264 if not IDirectoryNode.providedBy(upload_dirnode):
265 raise AssertionError("The URI in '%s' does not refer to a directory."
266 % os.path.join('private', 'magic_folder_dircap'))
267 if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
268 raise AssertionError("The URI in '%s' is not a writecap to a directory."
269 % os.path.join('private', 'magic_folder_dircap'))
271 self._upload_dirnode = upload_dirnode
272 self._inotify = get_inotify_module()
273 self._notifier = self._inotify.INotify()
274 self._pending = set() # of unicode relpaths
276 self._periodic_full_scan_duration = 10 * 60 # perform a full scan every 10 minutes
278 if hasattr(self._notifier, 'set_pending_delay'):
279 self._notifier.set_pending_delay(pending_delay)
281 # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
283 self.mask = ( self._inotify.IN_CREATE
284 | self._inotify.IN_CLOSE_WRITE
285 | self._inotify.IN_MOVED_TO
286 | self._inotify.IN_MOVED_FROM
287 | self._inotify.IN_DELETE
288 | self._inotify.IN_ONLYDIR
291 self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
294 def start_monitoring(self):
295 self._log("start_monitoring")
296 d = defer.succeed(None)
297 d.addCallback(lambda ign: self._notifier.startReading())
298 d.addCallback(lambda ign: self._count('dirs_monitored'))
299 d.addBoth(self._call_hook, 'started')
304 self._notifier.stopReading()
305 self._count('dirs_monitored', -1)
306 self.periodic_callid.cancel()
307 if hasattr(self._notifier, 'wait_until_stopped'):
308 d = self._notifier.wait_until_stopped()
310 d = defer.succeed(None)
311 d.addCallback(lambda ign: self._lazy_tail)
314 def start_uploading(self):
315 self._log("start_uploading")
318 all_relpaths = self._db.get_all_relpaths()
319 self._log("all relpaths: %r" % (all_relpaths,))
321 for relpath_u in all_relpaths:
322 self._add_pending(relpath_u)
326 def _extend_queue_and_keep_going(self, relpaths_u):
327 self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
328 for relpath_u in relpaths_u:
329 progress = PercentProgress()
330 item = UploadItem(relpath_u, progress)
331 item.set_status('queued', self._clock.seconds())
332 self._deque.append(item)
334 self._count('objects_queued', len(relpaths_u))
337 if self._immediate: # for tests
340 self._clock.callLater(0, self._turn_deque)
342 def _full_scan(self):
343 self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan)
345 self._log("_pending %r" % (self._pending))
347 self._extend_queue_and_keep_going(self._pending)
349 def _add_pending(self, relpath_u):
350 self._log("add pending %r" % (relpath_u,))
351 if not magicpath.should_ignore_file(relpath_u):
352 self._pending.add(relpath_u)
354 def _scan(self, reldir_u):
355 # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
356 # Note that this doesn't add them to the deque -- that will
358 self._log("scan %r" % (reldir_u,))
359 fp = self._get_filepath(reldir_u)
361 children = listdir_filepath(fp)
362 except EnvironmentError:
363 raise Exception("WARNING: magic folder: permission denied on directory %s"
364 % quote_filepath(fp))
365 except FilenameEncodingError:
366 raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
367 % quote_filepath(fp))
369 for child in children:
370 _assert(isinstance(child, unicode), child=child)
371 self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
373 def is_pending(self, relpath_u):
374 return relpath_u in self._pending
376 def _notify(self, opaque, path, events_mask):
377 self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
378 relpath_u = self._get_relpath(path)
380 # We filter out IN_CREATE events not associated with a directory.
381 # Acting on IN_CREATE for files could cause us to read and upload
382 # a possibly-incomplete file before the application has closed it.
383 # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
384 # It isn't possible to avoid watching for IN_CREATE at all, because
385 # it is the only event notified for a directory creation.
387 if ((events_mask & self._inotify.IN_CREATE) != 0 and
388 (events_mask & self._inotify.IN_ISDIR) == 0):
389 self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
391 if relpath_u in self._pending:
392 self._log("not queueing %r because it is already pending" % (relpath_u,))
394 if magicpath.should_ignore_file(relpath_u):
395 self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
398 self._pending.add(relpath_u)
399 self._extend_queue_and_keep_going([relpath_u])
401 def _when_queue_is_empty(self):
402 return defer.succeed(None)
404 def _process(self, item):
406 relpath_u = item.relpath_u
407 self._log("_process(%r)" % (relpath_u,))
408 item.set_status('started', self._clock.seconds())
410 if relpath_u is None:
411 item.set_status('invalid_path', self._clock.seconds())
413 precondition(isinstance(relpath_u, unicode), relpath_u)
414 precondition(not relpath_u.endswith(u'/'), relpath_u)
416 d = defer.succeed(None)
418 def _maybe_upload(ign, now=None):
419 self._log("_maybe_upload: relpath_u=%r, now=%r" % (relpath_u, now))
422 fp = self._get_filepath(relpath_u)
423 pathinfo = get_pathinfo(unicode_from_filepath(fp))
425 self._log("about to remove %r from pending set %r" %
426 (relpath_u, self._pending))
427 self._pending.remove(relpath_u)
428 encoded_path_u = magicpath.path2magic(relpath_u)
430 if not pathinfo.exists:
431 # FIXME merge this with the 'isfile' case.
432 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
433 self._count('objects_disappeared')
435 db_entry = self._db.get_db_entry(relpath_u)
439 last_downloaded_timestamp = now # is this correct?
441 if is_new_file(pathinfo, db_entry):
442 new_version = db_entry.version + 1
444 self._log("Not uploading %r" % (relpath_u,))
445 self._count('objects_not_uploaded')
448 metadata = { 'version': new_version,
450 'last_downloaded_timestamp': last_downloaded_timestamp }
451 if db_entry.last_downloaded_uri is not None:
452 metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
454 empty_uploadable = Data("", self._client.convergence)
455 d2 = self._upload_dirnode.add_file(
456 encoded_path_u, empty_uploadable,
459 progress=item.progress,
462 def _add_db_entry(filenode):
463 filecap = filenode.get_uri()
464 last_downloaded_uri = metadata.get('last_downloaded_uri', None)
465 self._db.did_upload_version(relpath_u, new_version, filecap,
466 last_downloaded_uri, last_downloaded_timestamp,
468 self._count('files_uploaded')
469 d2.addCallback(_add_db_entry)
471 elif pathinfo.islink:
472 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
476 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
477 self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
479 uploadable = Data("", self._client.convergence)
480 encoded_path_u += magicpath.path2magic(u"/")
481 self._log("encoded_path_u = %r" % (encoded_path_u,))
482 upload_d = self._upload_dirnode.add_file(
483 encoded_path_u, uploadable,
484 metadata={"version": 0},
486 progress=item.progress,
488 def _dir_succeeded(ign):
489 self._log("created subdirectory %r" % (relpath_u,))
490 self._count('directories_created')
492 self._log("failed to create subdirectory %r" % (relpath_u,))
494 upload_d.addCallbacks(_dir_succeeded, _dir_failed)
495 upload_d.addCallback(lambda ign: self._scan(relpath_u))
496 upload_d.addCallback(lambda ign: self._extend_queue_and_keep_going(self._pending))
498 elif pathinfo.isfile:
499 db_entry = self._db.get_db_entry(relpath_u)
501 last_downloaded_timestamp = now
505 elif is_new_file(pathinfo, db_entry):
506 new_version = db_entry.version + 1
508 self._log("Not uploading %r" % (relpath_u,))
509 self._count('objects_not_uploaded')
512 metadata = { 'version': new_version,
513 'last_downloaded_timestamp': last_downloaded_timestamp }
514 if db_entry is not None and db_entry.last_downloaded_uri is not None:
515 metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
517 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
518 d2 = self._upload_dirnode.add_file(
519 encoded_path_u, uploadable,
522 progress=item.progress,
525 def _add_db_entry(filenode):
526 filecap = filenode.get_uri()
527 last_downloaded_uri = metadata.get('last_downloaded_uri', None)
528 self._db.did_upload_version(relpath_u, new_version, filecap,
529 last_downloaded_uri, last_downloaded_timestamp,
531 self._count('files_uploaded')
532 d2.addCallback(_add_db_entry)
535 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
538 d.addCallback(_maybe_upload)
541 self._count('objects_succeeded')
542 item.set_status('success', self._clock.seconds())
545 self._count('objects_failed')
546 self._log("%s while processing %r" % (f, relpath_u))
547 item.set_status('failure', self._clock.seconds())
549 d.addCallbacks(_succeeded, _failed)
552 def _get_metadata(self, encoded_path_u):
554 d = self._upload_dirnode.get_metadata_for(encoded_path_u)
559 def _get_filenode(self, encoded_path_u):
561 d = self._upload_dirnode.get(encoded_path_u)
567 class WriteFileMixin(object):
570 def _get_conflicted_filename(self, abspath_u):
571 return abspath_u + u".conflict"
573 def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
574 self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
575 % (abspath_u, len(file_contents), is_conflict, now))
577 # 1. Write a temporary file, say .foo.tmp.
578 # 2. is_conflict determines whether this is an overwrite or a conflict.
579 # 3. Set the mtime of the replacement file to be T seconds before the
580 # current local time.
581 # 4. Perform a file replacement with backup filename foo.backup,
582 # replaced file foo, and replacement file .foo.tmp. If any step of
583 # this operation fails, reclassify as a conflict and stop.
585 # Returns the path of the destination file.
587 precondition_abspath(abspath_u)
588 replacement_path_u = abspath_u + u".tmp" # FIXME more unique
589 backup_path_u = abspath_u + u".backup"
593 # ensure parent directory exists
594 head, tail = os.path.split(abspath_u)
596 old_mask = os.umask(self._umask)
598 fileutil.make_dirs(head, (~ self._umask) & 0777)
599 fileutil.write(replacement_path_u, file_contents)
603 os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
605 print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
606 return self._rename_conflicted_file(abspath_u, replacement_path_u)
609 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
611 except fileutil.ConflictError:
612 return self._rename_conflicted_file(abspath_u, replacement_path_u)
614 def _rename_conflicted_file(self, abspath_u, replacement_path_u):
615 self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
617 conflict_path_u = self._get_conflicted_filename(abspath_u)
618 print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
619 if os.path.isfile(replacement_path_u):
620 print "%r exists" % (replacement_path_u,)
621 if os.path.isfile(conflict_path_u):
622 print "%r exists" % (conflict_path_u,)
624 fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
625 return conflict_path_u
627 def _rename_deleted_file(self, abspath_u):
628 self._log('renaming deleted file to backup: %s' % (abspath_u,))
630 fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
632 self._log("Already gone: '%s'" % (abspath_u,))
636 class DownloadItem(QueuedItem):
637 def __init__(self, relpath_u, progress, filenode, metadata):
638 super(DownloadItem, self).__init__(relpath_u, progress)
639 self.file_node = filenode
640 self.metadata = metadata
643 class Downloader(QueueMixin, WriteFileMixin):
644 REMOTE_SCAN_INTERVAL = 3 # facilitates tests
646 def __init__(self, client, local_path_u, db, collective_dirnode,
647 upload_readonly_dircap, clock, is_upload_pending, umask):
648 QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
650 if not IDirectoryNode.providedBy(collective_dirnode):
651 raise AssertionError("The URI in '%s' does not refer to a directory."
652 % os.path.join('private', 'collective_dircap'))
653 if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
654 raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
655 % os.path.join('private', 'collective_dircap'))
657 self._collective_dirnode = collective_dirnode
658 self._upload_readonly_dircap = upload_readonly_dircap
659 self._is_upload_pending = is_upload_pending
662 def start_downloading(self):
663 self._log("start_downloading")
664 self._turn_delay = self.REMOTE_SCAN_INTERVAL
665 files = self._db.get_all_relpaths()
666 self._log("all files %s" % files)
668 d = self._scan_remote_collective(scan_self=True)
669 d.addBoth(self._logcb, "after _scan_remote_collective 0")
676 d = defer.succeed(None)
677 d.addCallback(lambda ign: self._lazy_tail)
680 def _should_download(self, relpath_u, remote_version):
682 _should_download returns a bool indicating whether or not a remote object should be downloaded.
683 We check the remote metadata version against our magic-folder db version number;
686 self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
687 if magicpath.should_ignore_file(relpath_u):
691 db_entry = self._db.get_db_entry(relpath_u)
694 self._log("version %r" % (db_entry.version,))
695 return (db_entry.version < remote_version)
697 def _get_local_latest(self, relpath_u):
699 _get_local_latest takes a unicode path string checks to see if this file object
700 exists in our magic-folder db; if not then return None
701 else check for an entry in our magic-folder db and return the version number.
703 if not self._get_filepath(relpath_u).exists():
705 db_entry = self._db.get_db_entry(relpath_u)
706 return None if db_entry is None else db_entry.version
708 def _get_collective_latest_file(self, filename):
710 _get_collective_latest_file takes a file path pointing to a file managed by
711 magic-folder and returns a deferred that fires with the two tuple containing a
712 file node and metadata for the latest version of the file located in the
713 magic-folder collective directory.
715 collective_dirmap_d = self._collective_dirnode.list()
716 def scan_collective(result):
717 list_of_deferreds = []
718 for dir_name in result.keys():
719 # XXX make sure it's a directory
720 d = defer.succeed(None)
721 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
722 list_of_deferreds.append(d)
723 deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
725 collective_dirmap_d.addCallback(scan_collective)
726 def highest_version(deferredList):
730 for success, result in deferredList:
732 if result[1]['version'] > max_version:
733 node, metadata = result
734 max_version = result[1]['version']
735 return node, metadata
736 collective_dirmap_d.addCallback(highest_version)
737 return collective_dirmap_d
739 def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
740 self._log("_scan_remote_dmd nickname %r" % (nickname,))
742 def scan_listing(listing_map):
743 for encoded_relpath_u in listing_map.keys():
744 relpath_u = magicpath.magic2path(encoded_relpath_u)
745 self._log("found %r" % (relpath_u,))
747 file_node, metadata = listing_map[encoded_relpath_u]
748 local_version = self._get_local_latest(relpath_u)
749 remote_version = metadata.get('version', None)
750 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
752 if local_version is None or remote_version is None or local_version < remote_version:
753 self._log("%r added to download queue" % (relpath_u,))
754 if scan_batch.has_key(relpath_u):
755 scan_batch[relpath_u] += [(file_node, metadata)]
757 scan_batch[relpath_u] = [(file_node, metadata)]
759 d.addCallback(scan_listing)
760 d.addBoth(self._logcb, "end of _scan_remote_dmd")
763 def _scan_remote_collective(self, scan_self=False):
764 self._log("_scan_remote_collective")
765 scan_batch = {} # path -> [(filenode, metadata)]
767 d = self._collective_dirnode.list()
768 def scan_collective(dirmap):
769 d2 = defer.succeed(None)
770 for dir_name in dirmap:
771 (dirnode, metadata) = dirmap[dir_name]
772 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
773 d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
774 self._scan_remote_dmd(dir_name, dirnode, scan_batch))
775 def _err(f, dir_name=dir_name):
776 self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
777 # XXX what should we do to make this failure more visible to users?
781 d.addCallback(scan_collective)
783 def _filter_batch_to_deque(ign):
784 self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
785 for relpath_u in scan_batch.keys():
786 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
788 if self._should_download(relpath_u, metadata['version']):
789 to_dl = DownloadItem(
791 PercentProgress(file_node.get_size()),
795 to_dl.set_status('queued', self._clock.seconds())
796 self._deque.append(to_dl)
798 self._log("Excluding %r" % (relpath_u,))
799 self._call_hook(None, 'processed', async=True)
801 self._log("deque after = %r" % (self._deque,))
802 d.addCallback(_filter_batch_to_deque)
805 def _when_queue_is_empty(self):
806 d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
807 d.addBoth(self._logcb, "after _scan_remote_collective 1")
808 d.addCallback(lambda ign: self._turn_deque())
811 def _process(self, item, now=None):
813 self._log("_process(%r)" % (item,))
814 if now is None: # XXX why can we pass in now?
815 now = time.time() # self._clock.seconds()
817 self._log("started! %s" % (now,))
818 item.set_status('started', now)
819 fp = self._get_filepath(item.relpath_u)
820 abspath_u = unicode_from_filepath(fp)
821 conflict_path_u = self._get_conflicted_filename(abspath_u)
823 d = defer.succeed(None)
825 def do_update_db(written_abspath_u):
826 filecap = item.file_node.get_uri()
827 last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
828 last_downloaded_uri = filecap
829 last_downloaded_timestamp = now
830 written_pathinfo = get_pathinfo(written_abspath_u)
832 if not written_pathinfo.exists and not item.metadata.get('deleted', False):
833 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
835 self._db.did_upload_version(
836 item.relpath_u, item.metadata['version'], last_uploaded_uri,
837 last_downloaded_uri, last_downloaded_timestamp, written_pathinfo,
839 self._count('objects_downloaded')
840 item.set_status('success', self._clock.seconds())
843 item.set_status('failure', self._clock.seconds())
844 self._log("download failed: %s" % (str(f),))
845 self._count('objects_failed')
848 if os.path.isfile(conflict_path_u):
850 raise ConflictError("download failed: already conflicted: %r" % (item.relpath_u,))
854 db_entry = self._db.get_db_entry(item.relpath_u)
855 dmd_last_downloaded_uri = item.metadata.get('last_downloaded_uri', None)
856 dmd_last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
858 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
859 if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
861 self._count('objects_conflicted')
862 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
864 self._count('objects_conflicted')
865 elif self._is_upload_pending(item.relpath_u):
867 self._count('objects_conflicted')
869 if item.relpath_u.endswith(u"/"):
870 if item.metadata.get('deleted', False):
871 self._log("rmdir(%r) ignored" % (abspath_u,))
873 self._log("mkdir(%r)" % (abspath_u,))
874 d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
875 d.addCallback(lambda ign: abspath_u)
877 if item.metadata.get('deleted', False):
878 d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
880 d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress))
881 d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
882 is_conflict=is_conflict))
884 d.addCallbacks(do_update_db, failed)
886 def trap_conflicts(f):
887 f.trap(ConflictError)
889 d.addErrback(trap_conflicts)