4 from collections import deque
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.progress import PercentProgress
19 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
20 extend_filepath, unicode_from_filepath, unicode_segments_from, \
21 quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
22 from allmydata.immutable.upload import FileName, Data
23 from allmydata import magicfolderdb, magicpath
25 defer.setDebugging(True)
26 IN_EXCL_UNLINK = 0x04000000L
28 def get_inotify_module():
30 if sys.platform == "win32":
31 from allmydata.windows import inotify
32 elif runtime.platform.supportsINotify():
33 from twisted.internet import inotify
35 raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
36 "This currently requires Linux or Windows.")
38 except (ImportError, AttributeError) as e:
40 if sys.platform == "win32":
41 raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
42 "Windows support requires at least Vista, and has only been tested on Windows 7.")
46 def is_new_file(pathinfo, db_entry):
50 if not pathinfo.exists and db_entry.size is None:
53 return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
54 (db_entry.size, db_entry.ctime, db_entry.mtime))
57 class MagicFolder(service.MultiService):
60 def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
61 pending_delay=1.0, clock=None):
62 precondition_abspath(local_path_u)
64 service.MultiService.__init__(self)
66 immediate = clock is not None
67 clock = clock or reactor
68 db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
70 return Failure(Exception('ERROR: Unable to load magic folder db.'))
76 upload_dirnode = self._client.create_node_from_uri(upload_dircap)
77 collective_dirnode = self._client.create_node_from_uri(collective_dircap)
79 self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
80 self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
81 upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
83 def startService(self):
84 # TODO: why is this being called more than once?
86 return defer.succeed(None)
87 print "%r.startService" % (self,)
88 service.MultiService.startService(self)
89 return self.uploader.start_monitoring()
92 """ready is used to signal us to start
93 processing the upload and download items...
95 self.uploader.start_uploading() # synchronous
96 return self.downloader.start_downloading()
100 d = self.uploader.stop()
101 d2 = self.downloader.stop()
102 d.addCallback(lambda ign: d2)
105 def remove_service(self):
106 return service.MultiService.disownServiceParent(self)
109 class QueueMixin(HookMixin):
110 def __init__(self, client, local_path_u, db, name, clock):
111 self._client = client
112 self._local_path_u = local_path_u
113 self._local_filepath = to_filepath(local_path_u)
117 self._hooks = {'processed': None, 'started': None}
118 self.started_d = self.set_hook('started')
120 if not self._local_filepath.exists():
121 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
122 "but there is no directory at that location."
123 % quote_local_unicode_path(self._local_path_u))
124 if not self._local_filepath.isdir():
125 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
126 "but the thing at that location is not a directory."
127 % quote_local_unicode_path(self._local_path_u))
129 self._deque = deque()
130 # do we also want to bound on "maximum age"?
131 self._process_history = deque(maxlen=10)
132 self._lazy_tail = defer.succeed(None)
133 self._stopped = False
136 def get_status(self):
138 Returns an iterable of instances that implement IQueuedItem
140 for item in self._deque:
142 for item in self._process_history:
145 def _get_filepath(self, relpath_u):
146 self._log("_get_filepath(%r)" % (relpath_u,))
147 return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
149 def _get_relpath(self, filepath):
150 self._log("_get_relpath(%r)" % (filepath,))
151 segments = unicode_segments_from(filepath, self._local_filepath)
152 self._log("segments = %r" % (segments,))
153 return u"/".join(segments)
155 def _count(self, counter_name, delta=1):
156 ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
157 self._log("%s += %r" % (counter_name, delta))
158 self._client.stats_provider.count(ctr, delta)
160 def _logcb(self, res, msg):
161 self._log("%s: %r" % (msg, res))
165 s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
168 #open("events", "ab+").write(msg)
170 def _turn_deque(self):
172 self._log("_turn_deque")
177 item = IQueuedItem(self._deque.pop())
178 self._process_history.append(item)
180 self._log("popped %r, now have %d" % (item, len(self._deque)))
181 self._count('objects_queued', -1)
183 self._log("deque is now empty")
184 self._lazy_tail.addBoth(self._logcb, "whawhat empty")
185 self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
186 self._lazy_tail.addBoth(self._logcb, "got past _when_queue_is_empty")
188 self._log("_turn_deque else clause")
189 self._lazy_tail.addBoth(self._logcb, "whawhat else %r" % (item,))
190 self._lazy_tail.addCallback(lambda ign: self._process(item))
191 self._lazy_tail.addBoth(self._logcb, "got past _process")
192 self._lazy_tail.addBoth(self._call_hook, 'processed', async=True)
193 self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,))
194 self._lazy_tail.addErrback(log.err)
195 self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
196 self._lazy_tail.addBoth(self._logcb, "got past deferLater")
197 except Exception as e:
198 self._log("---- turn deque exception %s" % (e,))
202 from zope.interface import Interface, implementer
204 class IQueuedItem(Interface):
208 @implementer(IQueuedItem)
209 class QueuedItem(object):
210 def __init__(self, relpath_u, progress):
211 self.relpath_u = relpath_u
212 self.progress = progress
213 self._status_history = dict()
215 def set_status(self, status, current_time=None):
216 if current_time is None:
217 current_time = time.time()
218 self._status_history[status] = current_time
220 def status_time(self, state):
222 Returns None if there's no status-update for 'state', else returns
223 the timestamp when that state was reached.
225 return self._status_history.get(state, None)
227 def status_history(self):
229 Returns a list of 2-tuples of (state, timestamp) sorted by timestamp
231 hist = self._status_history.items()
232 hist.sort(lambda a, b: cmp(a[1], b[1]))
236 class UploadItem(QueuedItem):
240 class Uploader(QueueMixin):
241 def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
243 QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
245 self.is_ready = False
246 self._immediate = immediate
248 if not IDirectoryNode.providedBy(upload_dirnode):
249 raise AssertionError("The URI in '%s' does not refer to a directory."
250 % os.path.join('private', 'magic_folder_dircap'))
251 if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
252 raise AssertionError("The URI in '%s' is not a writecap to a directory."
253 % os.path.join('private', 'magic_folder_dircap'))
255 self._upload_dirnode = upload_dirnode
256 self._inotify = get_inotify_module()
257 self._notifier = self._inotify.INotify()
258 self._pending = set() # of unicode relpaths
260 self._periodic_full_scan_duration = 10 * 60 # perform a full scan every 10 minutes
262 if hasattr(self._notifier, 'set_pending_delay'):
263 self._notifier.set_pending_delay(pending_delay)
265 # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
267 self.mask = ( self._inotify.IN_CREATE
268 | self._inotify.IN_CLOSE_WRITE
269 | self._inotify.IN_MOVED_TO
270 | self._inotify.IN_MOVED_FROM
271 | self._inotify.IN_DELETE
272 | self._inotify.IN_ONLYDIR
275 self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
278 def start_monitoring(self):
279 self._log("start_monitoring")
280 d = defer.succeed(None)
281 d.addCallback(lambda ign: self._notifier.startReading())
282 d.addCallback(lambda ign: self._count('dirs_monitored'))
283 d.addBoth(self._call_hook, 'started')
288 self._notifier.stopReading()
289 self._count('dirs_monitored', -1)
290 self.periodic_callid.cancel()
291 if hasattr(self._notifier, 'wait_until_stopped'):
292 d = self._notifier.wait_until_stopped()
294 d = defer.succeed(None)
295 d.addCallback(lambda ign: self._lazy_tail)
298 def start_uploading(self):
299 self._log("start_uploading")
302 all_relpaths = self._db.get_all_relpaths()
303 self._log("all relpaths: %r" % (all_relpaths,))
305 for relpath_u in all_relpaths:
306 self._add_pending(relpath_u)
310 def _extend_queue_and_keep_going(self, relpaths_u):
311 self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
312 for relpath_u in relpaths_u:
313 progress = PercentProgress()
314 item = UploadItem(relpath_u, progress)
315 item.set_status('queued', self._clock.seconds())
316 self._deque.append(item)
318 self._count('objects_queued', len(relpaths_u))
321 if self._immediate: # for tests
324 self._clock.callLater(0, self._turn_deque)
326 def _full_scan(self):
327 self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan)
329 self._log("_pending %r" % (self._pending))
331 self._extend_queue_and_keep_going(self._pending)
333 def _add_pending(self, relpath_u):
334 self._log("add pending %r" % (relpath_u,))
335 if not magicpath.should_ignore_file(relpath_u):
336 self._pending.add(relpath_u)
338 def _scan(self, reldir_u):
339 # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
340 # Note that this doesn't add them to the deque -- that will
342 self._log("scan %r" % (reldir_u,))
343 fp = self._get_filepath(reldir_u)
345 children = listdir_filepath(fp)
346 except EnvironmentError:
347 raise Exception("WARNING: magic folder: permission denied on directory %s"
348 % quote_filepath(fp))
349 except FilenameEncodingError:
350 raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
351 % quote_filepath(fp))
353 for child in children:
354 _assert(isinstance(child, unicode), child=child)
355 self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
357 def is_pending(self, relpath_u):
358 return relpath_u in self._pending
360 def _notify(self, opaque, path, events_mask):
361 self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
362 relpath_u = self._get_relpath(path)
364 # We filter out IN_CREATE events not associated with a directory.
365 # Acting on IN_CREATE for files could cause us to read and upload
366 # a possibly-incomplete file before the application has closed it.
367 # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
368 # It isn't possible to avoid watching for IN_CREATE at all, because
369 # it is the only event notified for a directory creation.
371 if ((events_mask & self._inotify.IN_CREATE) != 0 and
372 (events_mask & self._inotify.IN_ISDIR) == 0):
373 self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
375 if relpath_u in self._pending:
376 self._log("not queueing %r because it is already pending" % (relpath_u,))
378 if magicpath.should_ignore_file(relpath_u):
379 self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
382 self._pending.add(relpath_u)
383 self._extend_queue_and_keep_going([relpath_u])
385 def _when_queue_is_empty(self):
386 return defer.succeed(None)
388 def _process(self, item):
390 relpath_u = item.relpath_u
391 self._log("_process(%r)" % (relpath_u,))
392 item.set_status('started', self._clock.seconds())
394 if relpath_u is None:
395 item.set_status('invalid_path', self._clock.seconds())
397 precondition(isinstance(relpath_u, unicode), relpath_u)
398 precondition(not relpath_u.endswith(u'/'), relpath_u)
400 d = defer.succeed(None)
402 def _maybe_upload(ign, now=None):
403 self._log("_maybe_upload: relpath_u=%r, now=%r" % (relpath_u, now))
406 fp = self._get_filepath(relpath_u)
407 pathinfo = get_pathinfo(unicode_from_filepath(fp))
409 self._log("about to remove %r from pending set %r" %
410 (relpath_u, self._pending))
411 self._pending.remove(relpath_u)
412 encoded_path_u = magicpath.path2magic(relpath_u)
414 if not pathinfo.exists:
415 # FIXME merge this with the 'isfile' case.
416 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
417 self._count('objects_disappeared')
419 db_entry = self._db.get_db_entry(relpath_u)
423 last_downloaded_timestamp = now # is this correct?
425 if is_new_file(pathinfo, db_entry):
426 new_version = db_entry.version + 1
428 self._log("Not uploading %r" % (relpath_u,))
429 self._count('objects_not_uploaded')
432 metadata = { 'version': new_version,
434 'last_downloaded_timestamp': last_downloaded_timestamp }
435 if db_entry.last_downloaded_uri is not None:
436 metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
438 empty_uploadable = Data("", self._client.convergence)
439 d2 = self._upload_dirnode.add_file(
440 encoded_path_u, empty_uploadable,
443 progress=item.progress,
446 def _add_db_entry(filenode):
447 filecap = filenode.get_uri()
448 last_downloaded_uri = metadata.get('last_downloaded_uri', None)
449 self._db.did_upload_version(relpath_u, new_version, filecap,
450 last_downloaded_uri, last_downloaded_timestamp,
452 self._count('files_uploaded')
453 d2.addCallback(_add_db_entry)
455 elif pathinfo.islink:
456 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
460 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
461 self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
463 uploadable = Data("", self._client.convergence)
464 encoded_path_u += magicpath.path2magic(u"/")
465 self._log("encoded_path_u = %r" % (encoded_path_u,))
466 upload_d = self._upload_dirnode.add_file(
467 encoded_path_u, uploadable,
468 metadata={"version": 0},
470 progress=item.progress,
472 def _dir_succeeded(ign):
473 self._log("created subdirectory %r" % (relpath_u,))
474 self._count('directories_created')
476 self._log("failed to create subdirectory %r" % (relpath_u,))
478 upload_d.addCallbacks(_dir_succeeded, _dir_failed)
479 upload_d.addCallback(lambda ign: self._scan(relpath_u))
480 upload_d.addCallback(lambda ign: self._extend_queue_and_keep_going(self._pending))
482 elif pathinfo.isfile:
483 db_entry = self._db.get_db_entry(relpath_u)
485 last_downloaded_timestamp = now
489 elif is_new_file(pathinfo, db_entry):
490 new_version = db_entry.version + 1
492 self._log("Not uploading %r" % (relpath_u,))
493 self._count('objects_not_uploaded')
496 metadata = { 'version': new_version,
497 'last_downloaded_timestamp': last_downloaded_timestamp }
498 if db_entry is not None and db_entry.last_downloaded_uri is not None:
499 metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
501 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
502 d2 = self._upload_dirnode.add_file(
503 encoded_path_u, uploadable,
506 progress=item.progress,
509 def _add_db_entry(filenode):
510 filecap = filenode.get_uri()
511 last_downloaded_uri = metadata.get('last_downloaded_uri', None)
512 self._db.did_upload_version(relpath_u, new_version, filecap,
513 last_downloaded_uri, last_downloaded_timestamp,
515 self._count('files_uploaded')
516 d2.addCallback(_add_db_entry)
519 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
522 d.addCallback(_maybe_upload)
525 self._count('objects_succeeded')
526 item.set_status('success', self._clock.seconds())
529 self._count('objects_failed')
530 self._log("%s while processing %r" % (f, relpath_u))
531 item.set_status('failure', self._clock.seconds())
533 d.addCallbacks(_succeeded, _failed)
536 def _get_metadata(self, encoded_path_u):
538 d = self._upload_dirnode.get_metadata_for(encoded_path_u)
543 def _get_filenode(self, encoded_path_u):
545 d = self._upload_dirnode.get(encoded_path_u)
551 class WriteFileMixin(object):
554 def _get_conflicted_filename(self, abspath_u):
555 return abspath_u + u".conflict"
557 def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
558 self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
559 % (abspath_u, len(file_contents), is_conflict, now))
561 # 1. Write a temporary file, say .foo.tmp.
562 # 2. is_conflict determines whether this is an overwrite or a conflict.
563 # 3. Set the mtime of the replacement file to be T seconds before the
564 # current local time.
565 # 4. Perform a file replacement with backup filename foo.backup,
566 # replaced file foo, and replacement file .foo.tmp. If any step of
567 # this operation fails, reclassify as a conflict and stop.
569 # Returns the path of the destination file.
571 precondition_abspath(abspath_u)
572 replacement_path_u = abspath_u + u".tmp" # FIXME more unique
573 backup_path_u = abspath_u + u".backup"
577 # ensure parent directory exists
578 head, tail = os.path.split(abspath_u)
580 old_mask = os.umask(self._umask)
582 fileutil.make_dirs(head, (~ self._umask) & 0777)
583 fileutil.write(replacement_path_u, file_contents)
587 os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
589 print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
590 return self._rename_conflicted_file(abspath_u, replacement_path_u)
593 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
595 except fileutil.ConflictError:
596 return self._rename_conflicted_file(abspath_u, replacement_path_u)
598 def _rename_conflicted_file(self, abspath_u, replacement_path_u):
599 self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
601 conflict_path_u = self._get_conflicted_filename(abspath_u)
602 print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
603 if os.path.isfile(replacement_path_u):
604 print "%r exists" % (replacement_path_u,)
605 if os.path.isfile(conflict_path_u):
606 print "%r exists" % (conflict_path_u,)
608 fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
609 return conflict_path_u
611 def _rename_deleted_file(self, abspath_u):
612 self._log('renaming deleted file to backup: %s' % (abspath_u,))
614 fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
616 self._log("Already gone: '%s'" % (abspath_u,))
620 class DownloadItem(QueuedItem):
621 def __init__(self, relpath_u, progress, filenode, metadata):
622 super(DownloadItem, self).__init__(relpath_u, progress)
623 self.file_node = filenode
624 self.metadata = metadata
627 class Downloader(QueueMixin, WriteFileMixin):
628 REMOTE_SCAN_INTERVAL = 3 # facilitates tests
630 def __init__(self, client, local_path_u, db, collective_dirnode,
631 upload_readonly_dircap, clock, is_upload_pending, umask):
632 QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
634 if not IDirectoryNode.providedBy(collective_dirnode):
635 raise AssertionError("The URI in '%s' does not refer to a directory."
636 % os.path.join('private', 'collective_dircap'))
637 if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
638 raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
639 % os.path.join('private', 'collective_dircap'))
641 self._collective_dirnode = collective_dirnode
642 self._upload_readonly_dircap = upload_readonly_dircap
643 self._is_upload_pending = is_upload_pending
646 def start_downloading(self):
647 self._log("start_downloading")
648 self._turn_delay = self.REMOTE_SCAN_INTERVAL
649 files = self._db.get_all_relpaths()
650 self._log("all files %s" % files)
652 d = self._scan_remote_collective(scan_self=True)
653 d.addBoth(self._logcb, "after _scan_remote_collective 0")
660 d = defer.succeed(None)
661 d.addCallback(lambda ign: self._lazy_tail)
664 def _should_download(self, relpath_u, remote_version):
666 _should_download returns a bool indicating whether or not a remote object should be downloaded.
667 We check the remote metadata version against our magic-folder db version number;
670 self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
671 if magicpath.should_ignore_file(relpath_u):
675 db_entry = self._db.get_db_entry(relpath_u)
678 self._log("version %r" % (db_entry.version,))
679 return (db_entry.version < remote_version)
681 def _get_local_latest(self, relpath_u):
683 _get_local_latest takes a unicode path string checks to see if this file object
684 exists in our magic-folder db; if not then return None
685 else check for an entry in our magic-folder db and return the version number.
687 if not self._get_filepath(relpath_u).exists():
689 db_entry = self._db.get_db_entry(relpath_u)
690 return None if db_entry is None else db_entry.version
692 def _get_collective_latest_file(self, filename):
694 _get_collective_latest_file takes a file path pointing to a file managed by
695 magic-folder and returns a deferred that fires with the two tuple containing a
696 file node and metadata for the latest version of the file located in the
697 magic-folder collective directory.
699 collective_dirmap_d = self._collective_dirnode.list()
700 def scan_collective(result):
701 list_of_deferreds = []
702 for dir_name in result.keys():
703 # XXX make sure it's a directory
704 d = defer.succeed(None)
705 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
706 list_of_deferreds.append(d)
707 deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
709 collective_dirmap_d.addCallback(scan_collective)
710 def highest_version(deferredList):
714 for success, result in deferredList:
716 if result[1]['version'] > max_version:
717 node, metadata = result
718 max_version = result[1]['version']
719 return node, metadata
720 collective_dirmap_d.addCallback(highest_version)
721 return collective_dirmap_d
723 def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
724 self._log("_scan_remote_dmd nickname %r" % (nickname,))
726 def scan_listing(listing_map):
727 for encoded_relpath_u in listing_map.keys():
728 relpath_u = magicpath.magic2path(encoded_relpath_u)
729 self._log("found %r" % (relpath_u,))
731 file_node, metadata = listing_map[encoded_relpath_u]
732 local_version = self._get_local_latest(relpath_u)
733 remote_version = metadata.get('version', None)
734 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
736 if local_version is None or remote_version is None or local_version < remote_version:
737 self._log("%r added to download queue" % (relpath_u,))
738 if scan_batch.has_key(relpath_u):
739 scan_batch[relpath_u] += [(file_node, metadata)]
741 scan_batch[relpath_u] = [(file_node, metadata)]
743 d.addCallback(scan_listing)
744 d.addBoth(self._logcb, "end of _scan_remote_dmd")
747 def _scan_remote_collective(self, scan_self=False):
748 self._log("_scan_remote_collective")
749 scan_batch = {} # path -> [(filenode, metadata)]
751 d = self._collective_dirnode.list()
752 def scan_collective(dirmap):
753 d2 = defer.succeed(None)
754 for dir_name in dirmap:
755 (dirnode, metadata) = dirmap[dir_name]
756 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
757 d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
758 self._scan_remote_dmd(dir_name, dirnode, scan_batch))
759 def _err(f, dir_name=dir_name):
760 self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
761 # XXX what should we do to make this failure more visible to users?
765 d.addCallback(scan_collective)
767 def _filter_batch_to_deque(ign):
768 self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
769 for relpath_u in scan_batch.keys():
770 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
772 if self._should_download(relpath_u, metadata['version']):
773 to_dl = DownloadItem(
775 PercentProgress(file_node.get_size()),
779 to_dl.set_status('queued', self._clock.seconds())
780 self._deque.append(to_dl)
782 self._log("Excluding %r" % (relpath_u,))
783 self._call_hook(None, 'processed', async=True)
785 self._log("deque after = %r" % (self._deque,))
786 d.addCallback(_filter_batch_to_deque)
789 def _when_queue_is_empty(self):
790 d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
791 d.addBoth(self._logcb, "after _scan_remote_collective 1")
792 d.addCallback(lambda ign: self._turn_deque())
795 def _process(self, item, now=None):
797 self._log("_process(%r)" % (item,))
798 if now is None: # XXX why can we pass in now?
799 now = time.time() # self._clock.seconds()
801 self._log("started! %s" % (now,))
802 item.set_status('started', now)
803 fp = self._get_filepath(item.relpath_u)
804 abspath_u = unicode_from_filepath(fp)
805 conflict_path_u = self._get_conflicted_filename(abspath_u)
807 d = defer.succeed(None)
809 def do_update_db(written_abspath_u):
810 filecap = item.file_node.get_uri()
811 last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
812 last_downloaded_uri = filecap
813 last_downloaded_timestamp = now
814 written_pathinfo = get_pathinfo(written_abspath_u)
816 if not written_pathinfo.exists and not item.metadata.get('deleted', False):
817 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
819 self._db.did_upload_version(
820 item.relpath_u, item.metadata['version'], last_uploaded_uri,
821 last_downloaded_uri, last_downloaded_timestamp, written_pathinfo,
823 self._count('objects_downloaded')
824 item.set_status('success', self._clock.seconds())
827 item.set_status('failure', self._clock.seconds())
828 self._log("download failed: %s" % (str(f),))
829 self._count('objects_failed')
832 if os.path.isfile(conflict_path_u):
834 raise ConflictError("download failed: already conflicted: %r" % (item.relpath_u,))
838 db_entry = self._db.get_db_entry(item.relpath_u)
839 dmd_last_downloaded_uri = item.metadata.get('last_downloaded_uri', None)
840 dmd_last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
842 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
843 if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
845 self._count('objects_conflicted')
846 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
848 self._count('objects_conflicted')
849 elif self._is_upload_pending(item.relpath_u):
851 self._count('objects_conflicted')
853 if item.relpath_u.endswith(u"/"):
854 if item.metadata.get('deleted', False):
855 self._log("rmdir(%r) ignored" % (abspath_u,))
857 self._log("mkdir(%r)" % (abspath_u,))
858 d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
859 d.addCallback(lambda ign: abspath_u)
861 if item.metadata.get('deleted', False):
862 d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
864 d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress))
865 d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
866 is_conflict=is_conflict))
868 d.addCallbacks(do_update_db, failed)
870 def trap_conflicts(f):
871 f.trap(ConflictError)
873 d.addErrback(trap_conflicts)