4 from collections import deque
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
12 from zope.interface import Interface, Attribute, implementer
14 from allmydata.util import fileutil
15 from allmydata.interfaces import IDirectoryNode
16 from allmydata.util import log
17 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
18 from allmydata.util.assertutil import precondition, _assert
19 from allmydata.util.deferredutil import HookMixin
20 from allmydata.util.progress import PercentProgress
21 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
22 extend_filepath, unicode_from_filepath, unicode_segments_from, \
23 quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
24 from allmydata.immutable.upload import FileName, Data
25 from allmydata import magicfolderdb, magicpath
27 defer.setDebugging(True)
28 IN_EXCL_UNLINK = 0x04000000L
30 def get_inotify_module():
32 if sys.platform == "win32":
33 from allmydata.windows import inotify
34 elif runtime.platform.supportsINotify():
35 from twisted.internet import inotify
37 raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
38 "This currently requires Linux or Windows.")
40 except (ImportError, AttributeError) as e:
42 if sys.platform == "win32":
43 raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
44 "Windows support requires at least Vista, and has only been tested on Windows 7.")
48 def is_new_file(pathinfo, db_entry):
52 if not pathinfo.exists and db_entry.size is None:
55 return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
56 (db_entry.size, db_entry.ctime, db_entry.mtime))
59 class MagicFolder(service.MultiService):
62 def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
63 pending_delay=1.0, clock=None):
64 precondition_abspath(local_path_u)
66 service.MultiService.__init__(self)
68 immediate = clock is not None
69 clock = clock or reactor
70 db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
72 return Failure(Exception('ERROR: Unable to load magic folder db.'))
78 upload_dirnode = self._client.create_node_from_uri(upload_dircap)
79 collective_dirnode = self._client.create_node_from_uri(collective_dircap)
81 self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
82 self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
83 upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
85 def startService(self):
86 # TODO: why is this being called more than once?
88 return defer.succeed(None)
89 print "%r.startService" % (self,)
90 service.MultiService.startService(self)
91 return self.uploader.start_monitoring()
94 """ready is used to signal us to start
95 processing the upload and download items...
97 self.uploader.start_uploading() # synchronous
98 return self.downloader.start_downloading()
102 d = self.uploader.stop()
103 d2 = self.downloader.stop()
104 d.addCallback(lambda ign: d2)
107 def remove_service(self):
108 return service.MultiService.disownServiceParent(self)
111 class QueueMixin(HookMixin):
112 def __init__(self, client, local_path_u, db, name, clock):
113 self._client = client
114 self._local_path_u = local_path_u
115 self._local_filepath = to_filepath(local_path_u)
119 self._hooks = {'processed': None, 'started': None}
120 self.started_d = self.set_hook('started')
122 if not self._local_filepath.exists():
123 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
124 "but there is no directory at that location."
125 % quote_local_unicode_path(self._local_path_u))
126 if not self._local_filepath.isdir():
127 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
128 "but the thing at that location is not a directory."
129 % quote_local_unicode_path(self._local_path_u))
131 self._deque = deque()
132 # do we also want to bound on "maximum age"?
133 self._process_history = deque(maxlen=20)
134 self._lazy_tail = defer.succeed(None)
135 self._stopped = False
138 def get_status(self):
140 Returns an iterable of instances that implement IQueuedItem
142 for item in self._deque:
144 for item in self._process_history:
147 def _get_filepath(self, relpath_u):
148 self._log("_get_filepath(%r)" % (relpath_u,))
149 return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
151 def _get_relpath(self, filepath):
152 self._log("_get_relpath(%r)" % (filepath,))
153 segments = unicode_segments_from(filepath, self._local_filepath)
154 self._log("segments = %r" % (segments,))
155 return u"/".join(segments)
157 def _count(self, counter_name, delta=1):
158 ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
159 self._log("%s += %r" % (counter_name, delta))
160 self._client.stats_provider.count(ctr, delta)
162 def _logcb(self, res, msg):
163 self._log("%s: %r" % (msg, res))
167 s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
170 #open("events", "ab+").write(msg)
172 def _turn_deque(self):
174 self._log("_turn_deque")
179 item = IQueuedItem(self._deque.pop())
180 self._process_history.append(item)
182 self._log("popped %r, now have %d" % (item, len(self._deque)))
183 self._count('objects_queued', -1)
185 self._log("deque is now empty")
186 self._lazy_tail.addBoth(self._logcb, "whawhat empty")
187 self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
188 self._lazy_tail.addBoth(self._logcb, "got past _when_queue_is_empty")
190 self._log("_turn_deque else clause")
191 self._lazy_tail.addBoth(self._logcb, "whawhat else %r" % (item,))
192 self._lazy_tail.addCallback(lambda ign: self._process(item))
193 self._lazy_tail.addBoth(self._logcb, "got past _process")
194 self._lazy_tail.addBoth(self._call_hook, 'processed', async=True)
195 self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,))
196 self._lazy_tail.addErrback(log.err)
197 self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
198 self._lazy_tail.addBoth(self._logcb, "got past deferLater")
199 except Exception as e:
200 self._log("---- turn deque exception %s" % (e,))
204 # this isn't in interfaces.py because it's very specific to QueueMixin
205 class IQueuedItem(Interface):
206 relpath_u = Attribute("The path this item represents")
207 progress = Attribute("A PercentProgress instance")
209 def set_status(self, status, current_time=None):
213 def status_time(self, state):
215 Get the time of particular state change, or None
218 def status_history(self):
220 All status changes, sorted latest -> oldest
224 @implementer(IQueuedItem)
225 class QueuedItem(object):
226 def __init__(self, relpath_u, progress):
227 self.relpath_u = relpath_u
228 self.progress = progress
229 self._status_history = dict()
231 def set_status(self, status, current_time=None):
232 if current_time is None:
233 current_time = time.time()
234 self._status_history[status] = current_time
236 def status_time(self, state):
238 Returns None if there's no status-update for 'state', else returns
239 the timestamp when that state was reached.
241 return self._status_history.get(state, None)
243 def status_history(self):
245 Returns a list of 2-tuples of (state, timestamp) sorted by timestamp
247 hist = self._status_history.items()
248 hist.sort(lambda a, b: cmp(a[1], b[1]))
252 class UploadItem(QueuedItem):
254 Represents a single item the _deque of the Uploader
259 class Uploader(QueueMixin):
260 def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
262 QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
264 self.is_ready = False
265 self._immediate = immediate
267 if not IDirectoryNode.providedBy(upload_dirnode):
268 raise AssertionError("The URI in '%s' does not refer to a directory."
269 % os.path.join('private', 'magic_folder_dircap'))
270 if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
271 raise AssertionError("The URI in '%s' is not a writecap to a directory."
272 % os.path.join('private', 'magic_folder_dircap'))
274 self._upload_dirnode = upload_dirnode
275 self._inotify = get_inotify_module()
276 self._notifier = self._inotify.INotify()
277 self._pending = set() # of unicode relpaths
279 self._periodic_full_scan_duration = 10 * 60 # perform a full scan every 10 minutes
281 if hasattr(self._notifier, 'set_pending_delay'):
282 self._notifier.set_pending_delay(pending_delay)
284 # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
286 self.mask = ( self._inotify.IN_CREATE
287 | self._inotify.IN_CLOSE_WRITE
288 | self._inotify.IN_MOVED_TO
289 | self._inotify.IN_MOVED_FROM
290 | self._inotify.IN_DELETE
291 | self._inotify.IN_ONLYDIR
294 self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
297 def start_monitoring(self):
298 self._log("start_monitoring")
299 d = defer.succeed(None)
300 d.addCallback(lambda ign: self._notifier.startReading())
301 d.addCallback(lambda ign: self._count('dirs_monitored'))
302 d.addBoth(self._call_hook, 'started')
307 self._notifier.stopReading()
308 self._count('dirs_monitored', -1)
309 self.periodic_callid.cancel()
310 if hasattr(self._notifier, 'wait_until_stopped'):
311 d = self._notifier.wait_until_stopped()
313 d = defer.succeed(None)
314 d.addCallback(lambda ign: self._lazy_tail)
317 def start_uploading(self):
318 self._log("start_uploading")
321 all_relpaths = self._db.get_all_relpaths()
322 self._log("all relpaths: %r" % (all_relpaths,))
324 for relpath_u in all_relpaths:
325 self._add_pending(relpath_u)
329 def _extend_queue_and_keep_going(self, relpaths_u):
330 self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
331 for relpath_u in relpaths_u:
332 progress = PercentProgress()
333 item = UploadItem(relpath_u, progress)
334 item.set_status('queued', self._clock.seconds())
335 self._deque.append(item)
337 self._count('objects_queued', len(relpaths_u))
340 if self._immediate: # for tests
343 self._clock.callLater(0, self._turn_deque)
345 def _full_scan(self):
346 self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan)
348 self._log("_pending %r" % (self._pending))
350 self._extend_queue_and_keep_going(self._pending)
352 def _add_pending(self, relpath_u):
353 self._log("add pending %r" % (relpath_u,))
354 if not magicpath.should_ignore_file(relpath_u):
355 self._pending.add(relpath_u)
357 def _scan(self, reldir_u):
358 # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
359 # Note that this doesn't add them to the deque -- that will
361 self._log("scan %r" % (reldir_u,))
362 fp = self._get_filepath(reldir_u)
364 children = listdir_filepath(fp)
365 except EnvironmentError:
366 raise Exception("WARNING: magic folder: permission denied on directory %s"
367 % quote_filepath(fp))
368 except FilenameEncodingError:
369 raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
370 % quote_filepath(fp))
372 for child in children:
373 _assert(isinstance(child, unicode), child=child)
374 self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
376 def is_pending(self, relpath_u):
377 return relpath_u in self._pending
379 def _notify(self, opaque, path, events_mask):
380 self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
381 relpath_u = self._get_relpath(path)
383 # We filter out IN_CREATE events not associated with a directory.
384 # Acting on IN_CREATE for files could cause us to read and upload
385 # a possibly-incomplete file before the application has closed it.
386 # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
387 # It isn't possible to avoid watching for IN_CREATE at all, because
388 # it is the only event notified for a directory creation.
390 if ((events_mask & self._inotify.IN_CREATE) != 0 and
391 (events_mask & self._inotify.IN_ISDIR) == 0):
392 self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
394 if relpath_u in self._pending:
395 self._log("not queueing %r because it is already pending" % (relpath_u,))
397 if magicpath.should_ignore_file(relpath_u):
398 self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
401 self._pending.add(relpath_u)
402 self._extend_queue_and_keep_going([relpath_u])
404 def _when_queue_is_empty(self):
405 return defer.succeed(None)
407 def _process(self, item):
409 relpath_u = item.relpath_u
410 self._log("_process(%r)" % (relpath_u,))
411 item.set_status('started', self._clock.seconds())
413 if relpath_u is None:
414 item.set_status('invalid_path', self._clock.seconds())
416 precondition(isinstance(relpath_u, unicode), relpath_u)
417 precondition(not relpath_u.endswith(u'/'), relpath_u)
419 d = defer.succeed(None)
421 def _maybe_upload(ign, now=None):
422 self._log("_maybe_upload: relpath_u=%r, now=%r" % (relpath_u, now))
425 fp = self._get_filepath(relpath_u)
426 pathinfo = get_pathinfo(unicode_from_filepath(fp))
428 self._log("about to remove %r from pending set %r" %
429 (relpath_u, self._pending))
430 self._pending.remove(relpath_u)
431 encoded_path_u = magicpath.path2magic(relpath_u)
433 if not pathinfo.exists:
434 # FIXME merge this with the 'isfile' case.
435 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
436 self._count('objects_disappeared')
438 db_entry = self._db.get_db_entry(relpath_u)
442 last_downloaded_timestamp = now # is this correct?
444 if is_new_file(pathinfo, db_entry):
445 new_version = db_entry.version + 1
447 self._log("Not uploading %r" % (relpath_u,))
448 self._count('objects_not_uploaded')
451 metadata = { 'version': new_version,
453 'last_downloaded_timestamp': last_downloaded_timestamp }
454 if db_entry.last_downloaded_uri is not None:
455 metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
457 empty_uploadable = Data("", self._client.convergence)
458 d2 = self._upload_dirnode.add_file(
459 encoded_path_u, empty_uploadable,
462 progress=item.progress,
465 def _add_db_entry(filenode):
466 filecap = filenode.get_uri()
467 last_downloaded_uri = metadata.get('last_downloaded_uri', None)
468 self._db.did_upload_version(relpath_u, new_version, filecap,
469 last_downloaded_uri, last_downloaded_timestamp,
471 self._count('files_uploaded')
472 d2.addCallback(_add_db_entry)
474 elif pathinfo.islink:
475 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
479 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
480 self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
482 uploadable = Data("", self._client.convergence)
483 encoded_path_u += magicpath.path2magic(u"/")
484 self._log("encoded_path_u = %r" % (encoded_path_u,))
485 upload_d = self._upload_dirnode.add_file(
486 encoded_path_u, uploadable,
487 metadata={"version": 0},
489 progress=item.progress,
491 def _dir_succeeded(ign):
492 self._log("created subdirectory %r" % (relpath_u,))
493 self._count('directories_created')
495 self._log("failed to create subdirectory %r" % (relpath_u,))
497 upload_d.addCallbacks(_dir_succeeded, _dir_failed)
498 upload_d.addCallback(lambda ign: self._scan(relpath_u))
499 upload_d.addCallback(lambda ign: self._extend_queue_and_keep_going(self._pending))
501 elif pathinfo.isfile:
502 db_entry = self._db.get_db_entry(relpath_u)
504 last_downloaded_timestamp = now
508 elif is_new_file(pathinfo, db_entry):
509 new_version = db_entry.version + 1
511 self._log("Not uploading %r" % (relpath_u,))
512 self._count('objects_not_uploaded')
515 metadata = { 'version': new_version,
516 'last_downloaded_timestamp': last_downloaded_timestamp }
517 if db_entry is not None and db_entry.last_downloaded_uri is not None:
518 metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
520 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
521 d2 = self._upload_dirnode.add_file(
522 encoded_path_u, uploadable,
525 progress=item.progress,
528 def _add_db_entry(filenode):
529 filecap = filenode.get_uri()
530 last_downloaded_uri = metadata.get('last_downloaded_uri', None)
531 self._db.did_upload_version(relpath_u, new_version, filecap,
532 last_downloaded_uri, last_downloaded_timestamp,
534 self._count('files_uploaded')
535 d2.addCallback(_add_db_entry)
538 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
541 d.addCallback(_maybe_upload)
544 self._count('objects_succeeded')
545 item.set_status('success', self._clock.seconds())
548 self._count('objects_failed')
549 self._log("%s while processing %r" % (f, relpath_u))
550 item.set_status('failure', self._clock.seconds())
552 d.addCallbacks(_succeeded, _failed)
555 def _get_metadata(self, encoded_path_u):
557 d = self._upload_dirnode.get_metadata_for(encoded_path_u)
562 def _get_filenode(self, encoded_path_u):
564 d = self._upload_dirnode.get(encoded_path_u)
570 class WriteFileMixin(object):
573 def _get_conflicted_filename(self, abspath_u):
574 return abspath_u + u".conflict"
576 def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
577 self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
578 % (abspath_u, len(file_contents), is_conflict, now))
580 # 1. Write a temporary file, say .foo.tmp.
581 # 2. is_conflict determines whether this is an overwrite or a conflict.
582 # 3. Set the mtime of the replacement file to be T seconds before the
583 # current local time.
584 # 4. Perform a file replacement with backup filename foo.backup,
585 # replaced file foo, and replacement file .foo.tmp. If any step of
586 # this operation fails, reclassify as a conflict and stop.
588 # Returns the path of the destination file.
590 precondition_abspath(abspath_u)
591 replacement_path_u = abspath_u + u".tmp" # FIXME more unique
592 backup_path_u = abspath_u + u".backup"
596 # ensure parent directory exists
597 head, tail = os.path.split(abspath_u)
599 old_mask = os.umask(self._umask)
601 fileutil.make_dirs(head, (~ self._umask) & 0777)
602 fileutil.write(replacement_path_u, file_contents)
606 os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
608 print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
609 return self._rename_conflicted_file(abspath_u, replacement_path_u)
612 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
614 except fileutil.ConflictError:
615 return self._rename_conflicted_file(abspath_u, replacement_path_u)
617 def _rename_conflicted_file(self, abspath_u, replacement_path_u):
618 self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
620 conflict_path_u = self._get_conflicted_filename(abspath_u)
621 print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
622 if os.path.isfile(replacement_path_u):
623 print "%r exists" % (replacement_path_u,)
624 if os.path.isfile(conflict_path_u):
625 print "%r exists" % (conflict_path_u,)
627 fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
628 return conflict_path_u
630 def _rename_deleted_file(self, abspath_u):
631 self._log('renaming deleted file to backup: %s' % (abspath_u,))
633 fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
635 self._log("Already gone: '%s'" % (abspath_u,))
639 class DownloadItem(QueuedItem):
641 Represents a single item in the _deque of the Downloader
643 def __init__(self, relpath_u, progress, filenode, metadata):
644 super(DownloadItem, self).__init__(relpath_u, progress)
645 self.file_node = filenode
646 self.metadata = metadata
649 class Downloader(QueueMixin, WriteFileMixin):
650 REMOTE_SCAN_INTERVAL = 3 # facilitates tests
652 def __init__(self, client, local_path_u, db, collective_dirnode,
653 upload_readonly_dircap, clock, is_upload_pending, umask):
654 QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
656 if not IDirectoryNode.providedBy(collective_dirnode):
657 raise AssertionError("The URI in '%s' does not refer to a directory."
658 % os.path.join('private', 'collective_dircap'))
659 if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
660 raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
661 % os.path.join('private', 'collective_dircap'))
663 self._collective_dirnode = collective_dirnode
664 self._upload_readonly_dircap = upload_readonly_dircap
665 self._is_upload_pending = is_upload_pending
668 def start_downloading(self):
669 self._log("start_downloading")
670 self._turn_delay = self.REMOTE_SCAN_INTERVAL
671 files = self._db.get_all_relpaths()
672 self._log("all files %s" % files)
674 d = self._scan_remote_collective(scan_self=True)
675 d.addBoth(self._logcb, "after _scan_remote_collective 0")
682 d = defer.succeed(None)
683 d.addCallback(lambda ign: self._lazy_tail)
686 def _should_download(self, relpath_u, remote_version):
688 _should_download returns a bool indicating whether or not a remote object should be downloaded.
689 We check the remote metadata version against our magic-folder db version number;
692 self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
693 if magicpath.should_ignore_file(relpath_u):
697 db_entry = self._db.get_db_entry(relpath_u)
700 self._log("version %r" % (db_entry.version,))
701 return (db_entry.version < remote_version)
703 def _get_local_latest(self, relpath_u):
705 _get_local_latest takes a unicode path string checks to see if this file object
706 exists in our magic-folder db; if not then return None
707 else check for an entry in our magic-folder db and return the version number.
709 if not self._get_filepath(relpath_u).exists():
711 db_entry = self._db.get_db_entry(relpath_u)
712 return None if db_entry is None else db_entry.version
714 def _get_collective_latest_file(self, filename):
716 _get_collective_latest_file takes a file path pointing to a file managed by
717 magic-folder and returns a deferred that fires with the two tuple containing a
718 file node and metadata for the latest version of the file located in the
719 magic-folder collective directory.
721 collective_dirmap_d = self._collective_dirnode.list()
722 def scan_collective(result):
723 list_of_deferreds = []
724 for dir_name in result.keys():
725 # XXX make sure it's a directory
726 d = defer.succeed(None)
727 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
728 list_of_deferreds.append(d)
729 deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
731 collective_dirmap_d.addCallback(scan_collective)
732 def highest_version(deferredList):
736 for success, result in deferredList:
738 if result[1]['version'] > max_version:
739 node, metadata = result
740 max_version = result[1]['version']
741 return node, metadata
742 collective_dirmap_d.addCallback(highest_version)
743 return collective_dirmap_d
745 def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
746 self._log("_scan_remote_dmd nickname %r" % (nickname,))
748 def scan_listing(listing_map):
749 for encoded_relpath_u in listing_map.keys():
750 relpath_u = magicpath.magic2path(encoded_relpath_u)
751 self._log("found %r" % (relpath_u,))
753 file_node, metadata = listing_map[encoded_relpath_u]
754 local_version = self._get_local_latest(relpath_u)
755 remote_version = metadata.get('version', None)
756 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
758 if local_version is None or remote_version is None or local_version < remote_version:
759 self._log("%r added to download queue" % (relpath_u,))
760 if scan_batch.has_key(relpath_u):
761 scan_batch[relpath_u] += [(file_node, metadata)]
763 scan_batch[relpath_u] = [(file_node, metadata)]
765 d.addCallback(scan_listing)
766 d.addBoth(self._logcb, "end of _scan_remote_dmd")
769 def _scan_remote_collective(self, scan_self=False):
770 self._log("_scan_remote_collective")
771 scan_batch = {} # path -> [(filenode, metadata)]
773 d = self._collective_dirnode.list()
774 def scan_collective(dirmap):
775 d2 = defer.succeed(None)
776 for dir_name in dirmap:
777 (dirnode, metadata) = dirmap[dir_name]
778 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
779 d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
780 self._scan_remote_dmd(dir_name, dirnode, scan_batch))
781 def _err(f, dir_name=dir_name):
782 self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
783 # XXX what should we do to make this failure more visible to users?
787 d.addCallback(scan_collective)
789 def _filter_batch_to_deque(ign):
790 self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
791 for relpath_u in scan_batch.keys():
792 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
794 if self._should_download(relpath_u, metadata['version']):
795 to_dl = DownloadItem(
797 PercentProgress(file_node.get_size()),
801 to_dl.set_status('queued', self._clock.seconds())
802 self._deque.append(to_dl)
804 self._log("Excluding %r" % (relpath_u,))
805 self._call_hook(None, 'processed', async=True)
807 self._log("deque after = %r" % (self._deque,))
808 d.addCallback(_filter_batch_to_deque)
811 def _when_queue_is_empty(self):
812 d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
813 d.addBoth(self._logcb, "after _scan_remote_collective 1")
814 d.addCallback(lambda ign: self._turn_deque())
817 def _process(self, item, now=None):
819 self._log("_process(%r)" % (item,))
820 if now is None: # XXX why can we pass in now?
821 now = time.time() # self._clock.seconds()
823 self._log("started! %s" % (now,))
824 item.set_status('started', now)
825 fp = self._get_filepath(item.relpath_u)
826 abspath_u = unicode_from_filepath(fp)
827 conflict_path_u = self._get_conflicted_filename(abspath_u)
829 d = defer.succeed(None)
831 def do_update_db(written_abspath_u):
832 filecap = item.file_node.get_uri()
833 last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
834 last_downloaded_uri = filecap
835 last_downloaded_timestamp = now
836 written_pathinfo = get_pathinfo(written_abspath_u)
838 if not written_pathinfo.exists and not item.metadata.get('deleted', False):
839 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
841 self._db.did_upload_version(
842 item.relpath_u, item.metadata['version'], last_uploaded_uri,
843 last_downloaded_uri, last_downloaded_timestamp, written_pathinfo,
845 self._count('objects_downloaded')
846 item.set_status('success', self._clock.seconds())
849 item.set_status('failure', self._clock.seconds())
850 self._log("download failed: %s" % (str(f),))
851 self._count('objects_failed')
854 if os.path.isfile(conflict_path_u):
856 raise ConflictError("download failed: already conflicted: %r" % (item.relpath_u,))
860 db_entry = self._db.get_db_entry(item.relpath_u)
861 dmd_last_downloaded_uri = item.metadata.get('last_downloaded_uri', None)
862 dmd_last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
864 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
865 if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
867 self._count('objects_conflicted')
868 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
870 self._count('objects_conflicted')
871 elif self._is_upload_pending(item.relpath_u):
873 self._count('objects_conflicted')
875 if item.relpath_u.endswith(u"/"):
876 if item.metadata.get('deleted', False):
877 self._log("rmdir(%r) ignored" % (abspath_u,))
879 self._log("mkdir(%r)" % (abspath_u,))
880 d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
881 d.addCallback(lambda ign: abspath_u)
883 if item.metadata.get('deleted', False):
884 d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
886 d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress))
887 d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
888 is_conflict=is_conflict))
890 d.addCallbacks(do_update_db, failed)
892 def trap_conflicts(f):
893 f.trap(ConflictError)
895 d.addErrback(trap_conflicts)