]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
Split scanning into a separate method so that we can call it periodically.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 def is_new_file(pathinfo, db_entry):
46     if db_entry is None:
47         return True
48
49     if not pathinfo.exists and db_entry.size is None:
50         return False
51
52     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
53             (db_entry.size, db_entry.ctime, db_entry.mtime))
54
55
56 class MagicFolder(service.MultiService):
57     name = 'magic-folder'
58
59     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
60                  pending_delay=1.0, clock=None):
61         precondition_abspath(local_path_u)
62
63         service.MultiService.__init__(self)
64
65         immediate = clock is not None
66         clock = clock or reactor
67         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
68         if db is None:
69             return Failure(Exception('ERROR: Unable to load magic folder db.'))
70
71         # for tests
72         self._client = client
73         self._db = db
74
75         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
76         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
77
78         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
79         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
80                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
81
82     def startService(self):
83         # TODO: why is this being called more than once?
84         if self.running:
85             return defer.succeed(None)
86         print "%r.startService" % (self,)
87         service.MultiService.startService(self)
88         return self.uploader.start_monitoring()
89
90     def ready(self):
91         """ready is used to signal us to start
92         processing the upload and download items...
93         """
94         d = self.uploader.start_scanning()
95         d2 = self.downloader.start_scanning()
96         d.addCallback(lambda ign: d2)
97         return d
98
99     def finish(self):
100         print "finish"
101         d = self.uploader.stop()
102         d2 = self.downloader.stop()
103         d.addCallback(lambda ign: d2)
104         return d
105
106     def remove_service(self):
107         return service.MultiService.disownServiceParent(self)
108
109
110 class QueueMixin(HookMixin):
111     def __init__(self, client, local_path_u, db, name, clock):
112         self._client = client
113         self._local_path_u = local_path_u
114         self._local_filepath = to_filepath(local_path_u)
115         self._db = db
116         self._name = name
117         self._clock = clock
118         self._hooks = {'processed': None, 'started': None}
119         self.started_d = self.set_hook('started')
120
121         if not self._local_filepath.exists():
122             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
123                                  "but there is no directory at that location."
124                                  % quote_local_unicode_path(self._local_path_u))
125         if not self._local_filepath.isdir():
126             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
127                                  "but the thing at that location is not a directory."
128                                  % quote_local_unicode_path(self._local_path_u))
129
130         self._deque = deque()
131         self._lazy_tail = defer.succeed(None)
132         self._stopped = False
133         self._turn_delay = 0
134
135     def _get_filepath(self, relpath_u):
136         self._log("_get_filepath(%r)" % (relpath_u,))
137         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
138
139     def _get_relpath(self, filepath):
140         self._log("_get_relpath(%r)" % (filepath,))
141         segments = unicode_segments_from(filepath, self._local_filepath)
142         self._log("segments = %r" % (segments,))
143         return u"/".join(segments)
144
145     def _count(self, counter_name, delta=1):
146         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
147         self._log("%s += %r" % (counter_name, delta))
148         self._client.stats_provider.count(ctr, delta)
149
150     def _logcb(self, res, msg):
151         self._log("%s: %r" % (msg, res))
152         return res
153
154     def _log(self, msg):
155         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
156         self._client.log(s)
157         print s
158         #open("events", "ab+").write(msg)
159
160     def _turn_deque(self):
161         self._log("_turn_deque")
162         if self._stopped:
163             self._log("stopped")
164             return
165         try:
166             item = self._deque.pop()
167             self._log("popped %r" % (item,))
168             self._count('objects_queued', -1)
169         except IndexError:
170             self._log("deque is now empty")
171             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
172         else:
173             self._lazy_tail.addCallback(lambda ign: self._process(item))
174             self._lazy_tail.addBoth(self._call_hook, 'processed')
175             self._lazy_tail.addErrback(log.err)
176             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
177
178
179 class Uploader(QueueMixin):
180     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
181                  immediate=False):
182         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
183
184         self.is_ready = False
185         self._immediate = immediate
186
187         if not IDirectoryNode.providedBy(upload_dirnode):
188             raise AssertionError("The URI in '%s' does not refer to a directory."
189                                  % os.path.join('private', 'magic_folder_dircap'))
190         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
191             raise AssertionError("The URI in '%s' is not a writecap to a directory."
192                                  % os.path.join('private', 'magic_folder_dircap'))
193
194         self._upload_dirnode = upload_dirnode
195         self._inotify = get_inotify_module()
196         self._notifier = self._inotify.INotify()
197         self._pending = set()
198
199         if hasattr(self._notifier, 'set_pending_delay'):
200             self._notifier.set_pending_delay(pending_delay)
201
202         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
203         #
204         self.mask = ( self._inotify.IN_CREATE
205                     | self._inotify.IN_CLOSE_WRITE
206                     | self._inotify.IN_MOVED_TO
207                     | self._inotify.IN_MOVED_FROM
208                     | self._inotify.IN_DELETE
209                     | self._inotify.IN_ONLYDIR
210                     | IN_EXCL_UNLINK
211                     )
212         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
213                              recursive=True)
214
215     def start_monitoring(self):
216         self._log("start_monitoring")
217         d = defer.succeed(None)
218         d.addCallback(lambda ign: self._notifier.startReading())
219         d.addCallback(lambda ign: self._count('dirs_monitored'))
220         d.addBoth(self._call_hook, 'started')
221         return d
222
223     def stop(self):
224         self._log("stop")
225         self._notifier.stopReading()
226         self._count('dirs_monitored', -1)
227         if hasattr(self._notifier, 'wait_until_stopped'):
228             d = self._notifier.wait_until_stopped()
229         else:
230             d = defer.succeed(None)
231         d.addCallback(lambda ign: self._lazy_tail)
232         return d
233
234     def start_scanning(self):
235         self._log("start_scanning")
236         self.is_ready = True
237         return self._full_scan()
238
239     def _full_scan(self):
240         self._pending = self._db.get_all_relpaths()
241         self._log("all_files %r" % (self._pending))
242         d = self._scan(u"")
243         def _add_pending(ign):
244             # This adds all of the files that were in the db but not already processed
245             # (normally because they have been deleted on disk).
246             self._log("adding %r" % (self._pending))
247             self._deque.extend(self._pending)
248         d.addCallback(_add_pending)
249         d.addCallback(lambda ign: self._turn_deque())
250         return d
251
252     def _scan(self, reldir_u):
253         self._log("scan %r" % (reldir_u,))
254         fp = self._get_filepath(reldir_u)
255         try:
256             children = listdir_filepath(fp)
257         except EnvironmentError:
258             raise Exception("WARNING: magic folder: permission denied on directory %s"
259                             % quote_filepath(fp))
260         except FilenameEncodingError:
261             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
262                             % quote_filepath(fp))
263
264         d = defer.succeed(None)
265         for child in children:
266             _assert(isinstance(child, unicode), child=child)
267             d.addCallback(lambda ign, child=child:
268                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
269             def _add_pending(relpath_u):
270                 if magicpath.should_ignore_file(relpath_u):
271                     return None
272
273                 self._pending.add(relpath_u)
274                 return relpath_u
275             d.addCallback(_add_pending)
276             # This call to _process doesn't go through the deque, and probably should.
277             d.addCallback(self._process)
278             d.addBoth(self._call_hook, 'processed')
279             d.addErrback(log.err)
280
281         return d
282
283     def is_pending(self, relpath_u):
284         return relpath_u in self._pending
285
286     def _notify(self, opaque, path, events_mask):
287         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
288         relpath_u = self._get_relpath(path)
289
290         # We filter out IN_CREATE events not associated with a directory.
291         # Acting on IN_CREATE for files could cause us to read and upload
292         # a possibly-incomplete file before the application has closed it.
293         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
294         # It isn't possible to avoid watching for IN_CREATE at all, because
295         # it is the only event notified for a directory creation.
296
297         if ((events_mask & self._inotify.IN_CREATE) != 0 and
298             (events_mask & self._inotify.IN_ISDIR) == 0):
299             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
300             return
301         if relpath_u in self._pending:
302             self._log("ignoring event for %r (already pending)" % (relpath_u,))
303             return
304         if magicpath.should_ignore_file(relpath_u):
305             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
306             return
307
308         self._log("appending %r to deque" % (relpath_u,))
309         self._deque.append(relpath_u)
310         self._pending.add(relpath_u)
311         self._count('objects_queued')
312         if self.is_ready:
313             if self._immediate:  # for tests
314                 self._turn_deque()
315             else:
316                 self._clock.callLater(0, self._turn_deque)
317
318     def _when_queue_is_empty(self):
319         return defer.succeed(None)
320
321     def _process(self, relpath_u):
322         # Uploader
323         self._log("_process(%r)" % (relpath_u,))
324         if relpath_u is None:
325             return
326         precondition(isinstance(relpath_u, unicode), relpath_u)
327         precondition(not relpath_u.endswith(u'/'), relpath_u)
328
329         d = defer.succeed(None)
330
331         def _maybe_upload(val, now=None):
332             if now is None:
333                 now = time.time()
334             fp = self._get_filepath(relpath_u)
335             pathinfo = get_pathinfo(unicode_from_filepath(fp))
336
337             self._log("about to remove %r from pending set %r" %
338                       (relpath_u, self._pending))
339             self._pending.remove(relpath_u)
340             encoded_path_u = magicpath.path2magic(relpath_u)
341
342             if not pathinfo.exists:
343                 # FIXME merge this with the 'isfile' case.
344                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
345                 self._count('objects_disappeared')
346
347                 db_entry = self._db.get_db_entry(relpath_u)
348                 if db_entry is None:
349                     return None
350
351                 last_downloaded_timestamp = now  # is this correct?
352
353                 if is_new_file(pathinfo, db_entry):
354                     new_version = db_entry.version + 1
355                 else:
356                     self._log("Not uploading %r" % (relpath_u,))
357                     self._count('objects_not_uploaded')
358                     return
359
360                 metadata = { 'version': new_version,
361                              'deleted': True,
362                              'last_downloaded_timestamp': last_downloaded_timestamp }
363                 if db_entry.last_downloaded_uri is not None:
364                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
365
366                 empty_uploadable = Data("", self._client.convergence)
367                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
368                                                    metadata=metadata, overwrite=True)
369
370                 def _add_db_entry(filenode):
371                     filecap = filenode.get_uri()
372                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
373                     self._db.did_upload_version(relpath_u, new_version, filecap,
374                                                 last_downloaded_uri, last_downloaded_timestamp,
375                                                 pathinfo)
376                     self._count('files_uploaded')
377                 d2.addCallback(_add_db_entry)
378                 return d2
379             elif pathinfo.islink:
380                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
381                 return None
382             elif pathinfo.isdir:
383                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
384                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
385
386                 uploadable = Data("", self._client.convergence)
387                 encoded_path_u += magicpath.path2magic(u"/")
388                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
389                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
390                 def _succeeded(ign):
391                     self._log("created subdirectory %r" % (relpath_u,))
392                     self._count('directories_created')
393                 def _failed(f):
394                     self._log("failed to create subdirectory %r" % (relpath_u,))
395                     return f
396                 upload_d.addCallbacks(_succeeded, _failed)
397                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
398                 return upload_d
399             elif pathinfo.isfile:
400                 db_entry = self._db.get_db_entry(relpath_u)
401
402                 last_downloaded_timestamp = now
403
404                 if db_entry is None:
405                     new_version = 0
406                 elif is_new_file(pathinfo, db_entry):
407                     new_version = db_entry.version + 1
408                 else:
409                     self._log("Not uploading %r" % (relpath_u,))
410                     self._count('objects_not_uploaded')
411                     return None
412
413                 metadata = { 'version': new_version,
414                              'last_downloaded_timestamp': last_downloaded_timestamp }
415                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
416                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
417
418                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
419                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
420                                                    metadata=metadata, overwrite=True)
421
422                 def _add_db_entry(filenode):
423                     filecap = filenode.get_uri()
424                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
425                     self._db.did_upload_version(relpath_u, new_version, filecap,
426                                                 last_downloaded_uri, last_downloaded_timestamp,
427                                                 pathinfo)
428                     self._count('files_uploaded')
429                 d2.addCallback(_add_db_entry)
430                 return d2
431             else:
432                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
433                 return None
434
435         d.addCallback(_maybe_upload)
436
437         def _succeeded(res):
438             self._count('objects_succeeded')
439             return res
440         def _failed(f):
441             self._count('objects_failed')
442             self._log("%s while processing %r" % (f, relpath_u))
443             return f
444         d.addCallbacks(_succeeded, _failed)
445         return d
446
447     def _get_metadata(self, encoded_path_u):
448         try:
449             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
450         except KeyError:
451             return Failure()
452         return d
453
454     def _get_filenode(self, encoded_path_u):
455         try:
456             d = self._upload_dirnode.get(encoded_path_u)
457         except KeyError:
458             return Failure()
459         return d
460
461
462 class WriteFileMixin(object):
463     FUDGE_SECONDS = 10.0
464
465     def _get_conflicted_filename(self, abspath_u):
466         return abspath_u + u".conflict"
467
468     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
469         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
470                   % (abspath_u, len(file_contents), is_conflict, now))
471
472         # 1. Write a temporary file, say .foo.tmp.
473         # 2. is_conflict determines whether this is an overwrite or a conflict.
474         # 3. Set the mtime of the replacement file to be T seconds before the
475         #    current local time.
476         # 4. Perform a file replacement with backup filename foo.backup,
477         #    replaced file foo, and replacement file .foo.tmp. If any step of
478         #    this operation fails, reclassify as a conflict and stop.
479         #
480         # Returns the path of the destination file.
481
482         precondition_abspath(abspath_u)
483         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
484         backup_path_u = abspath_u + u".backup"
485         if now is None:
486             now = time.time()
487
488         # ensure parent directory exists
489         head, tail = os.path.split(abspath_u)
490
491         old_mask = os.umask(self._umask)
492         try:
493             fileutil.make_dirs(head, (~ self._umask) & 0777)
494             fileutil.write(replacement_path_u, file_contents)
495         finally:
496             os.umask(old_mask)
497
498         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
499         if is_conflict:
500             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
501             return self._rename_conflicted_file(abspath_u, replacement_path_u)
502         else:
503             try:
504                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
505                 return abspath_u
506             except fileutil.ConflictError:
507                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
508
509     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
510         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
511
512         conflict_path_u = self._get_conflicted_filename(abspath_u)
513         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
514         if os.path.isfile(replacement_path_u):
515             print "%r exists" % (replacement_path_u,)
516         if os.path.isfile(conflict_path_u):
517             print "%r exists" % (conflict_path_u,)
518
519         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
520         return conflict_path_u
521
522     def _rename_deleted_file(self, abspath_u):
523         self._log('renaming deleted file to backup: %s' % (abspath_u,))
524         try:
525             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
526         except OSError:
527             self._log("Already gone: '%s'" % (abspath_u,))
528         return abspath_u
529
530
531 class Downloader(QueueMixin, WriteFileMixin):
532     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
533
534     def __init__(self, client, local_path_u, db, collective_dirnode,
535                  upload_readonly_dircap, clock, is_upload_pending, umask):
536         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
537
538         if not IDirectoryNode.providedBy(collective_dirnode):
539             raise AssertionError("The URI in '%s' does not refer to a directory."
540                                  % os.path.join('private', 'collective_dircap'))
541         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
542             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
543                                  % os.path.join('private', 'collective_dircap'))
544
545         self._collective_dirnode = collective_dirnode
546         self._upload_readonly_dircap = upload_readonly_dircap
547         self._is_upload_pending = is_upload_pending
548         self._umask = umask
549
550     def start_scanning(self):
551         self._log("start_scanning")
552         files = self._db.get_all_relpaths()
553         self._log("all files %s" % files)
554
555         d = self._scan_remote_collective(scan_self=True)
556         d.addBoth(self._logcb, "after _scan_remote_collective 0")
557         self._turn_deque()
558         return d
559
560     def stop(self):
561         self._stopped = True
562         d = defer.succeed(None)
563         d.addCallback(lambda ign: self._lazy_tail)
564         return d
565
566     def _should_download(self, relpath_u, remote_version):
567         """
568         _should_download returns a bool indicating whether or not a remote object should be downloaded.
569         We check the remote metadata version against our magic-folder db version number;
570         latest version wins.
571         """
572         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
573         if magicpath.should_ignore_file(relpath_u):
574             self._log("nope")
575             return False
576         self._log("yep")
577         db_entry = self._db.get_db_entry(relpath_u)
578         if db_entry is None:
579             return True
580         self._log("version %r" % (db_entry.version,))
581         return (db_entry.version < remote_version)
582
583     def _get_local_latest(self, relpath_u):
584         """
585         _get_local_latest takes a unicode path string checks to see if this file object
586         exists in our magic-folder db; if not then return None
587         else check for an entry in our magic-folder db and return the version number.
588         """
589         if not self._get_filepath(relpath_u).exists():
590             return None
591         db_entry = self._db.get_db_entry(relpath_u)
592         return None if db_entry is None else db_entry.version
593
594     def _get_collective_latest_file(self, filename):
595         """
596         _get_collective_latest_file takes a file path pointing to a file managed by
597         magic-folder and returns a deferred that fires with the two tuple containing a
598         file node and metadata for the latest version of the file located in the
599         magic-folder collective directory.
600         """
601         collective_dirmap_d = self._collective_dirnode.list()
602         def scan_collective(result):
603             list_of_deferreds = []
604             for dir_name in result.keys():
605                 # XXX make sure it's a directory
606                 d = defer.succeed(None)
607                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
608                 list_of_deferreds.append(d)
609             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
610             return deferList
611         collective_dirmap_d.addCallback(scan_collective)
612         def highest_version(deferredList):
613             max_version = 0
614             metadata = None
615             node = None
616             for success, result in deferredList:
617                 if success:
618                     if result[1]['version'] > max_version:
619                         node, metadata = result
620                         max_version = result[1]['version']
621             return node, metadata
622         collective_dirmap_d.addCallback(highest_version)
623         return collective_dirmap_d
624
625     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
626         self._log("_scan_remote_dmd nickname %r" % (nickname,))
627         d = dirnode.list()
628         def scan_listing(listing_map):
629             for encoded_relpath_u in listing_map.keys():
630                 relpath_u = magicpath.magic2path(encoded_relpath_u)
631                 self._log("found %r" % (relpath_u,))
632
633                 file_node, metadata = listing_map[encoded_relpath_u]
634                 local_version = self._get_local_latest(relpath_u)
635                 remote_version = metadata.get('version', None)
636                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
637
638                 if local_version is None or remote_version is None or local_version < remote_version:
639                     self._log("%r added to download queue" % (relpath_u,))
640                     if scan_batch.has_key(relpath_u):
641                         scan_batch[relpath_u] += [(file_node, metadata)]
642                     else:
643                         scan_batch[relpath_u] = [(file_node, metadata)]
644
645         d.addCallback(scan_listing)
646         d.addBoth(self._logcb, "end of _scan_remote_dmd")
647         return d
648
649     def _scan_remote_collective(self, scan_self=False):
650         self._log("_scan_remote_collective")
651         scan_batch = {}  # path -> [(filenode, metadata)]
652
653         d = self._collective_dirnode.list()
654         def scan_collective(dirmap):
655             d2 = defer.succeed(None)
656             for dir_name in dirmap:
657                 (dirnode, metadata) = dirmap[dir_name]
658                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
659                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
660                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
661                     def _err(f, dir_name=dir_name):
662                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
663                         # XXX what should we do to make this failure more visible to users?
664                     d2.addErrback(_err)
665
666             return d2
667         d.addCallback(scan_collective)
668
669         def _filter_batch_to_deque(ign):
670             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
671             for relpath_u in scan_batch.keys():
672                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
673
674                 if self._should_download(relpath_u, metadata['version']):
675                     self._deque.append( (relpath_u, file_node, metadata) )
676                 else:
677                     self._log("Excluding %r" % (relpath_u,))
678                     self._call_hook(None, 'processed')
679
680             self._log("deque after = %r" % (self._deque,))
681         d.addCallback(_filter_batch_to_deque)
682         return d
683
684     def _when_queue_is_empty(self):
685         d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
686         d.addBoth(self._logcb, "after _scan_remote_collective 1")
687         d.addCallback(lambda ign: self._turn_deque())
688         return d
689
690     def _process(self, item, now=None):
691         # Downloader
692         self._log("_process(%r)" % (item,))
693         if now is None:
694             now = time.time()
695         (relpath_u, file_node, metadata) = item
696         fp = self._get_filepath(relpath_u)
697         abspath_u = unicode_from_filepath(fp)
698         conflict_path_u = self._get_conflicted_filename(abspath_u)
699
700         d = defer.succeed(None)
701
702         def do_update_db(written_abspath_u):
703             filecap = file_node.get_uri()
704             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
705             last_downloaded_uri = filecap
706             last_downloaded_timestamp = now
707             written_pathinfo = get_pathinfo(written_abspath_u)
708
709             if not written_pathinfo.exists and not metadata.get('deleted', False):
710                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
711
712             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
713                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
714             self._count('objects_downloaded')
715         def failed(f):
716             self._log("download failed: %s" % (str(f),))
717             self._count('objects_failed')
718             return f
719
720         if os.path.isfile(conflict_path_u):
721             def fail(res):
722                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
723             d.addCallback(fail)
724         else:
725             is_conflict = False
726             db_entry = self._db.get_db_entry(relpath_u)
727             dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
728             dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
729             if db_entry:
730                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
731                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
732                         is_conflict = True
733                         self._count('objects_conflicted')
734                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
735                     is_conflict = True
736                     self._count('objects_conflicted')
737                 elif self._is_upload_pending(relpath_u):
738                     is_conflict = True
739                     self._count('objects_conflicted')
740
741             if relpath_u.endswith(u"/"):
742                 if metadata.get('deleted', False):
743                     self._log("rmdir(%r) ignored" % (abspath_u,))
744                 else:
745                     self._log("mkdir(%r)" % (abspath_u,))
746                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
747                     d.addCallback(lambda ign: abspath_u)
748             else:
749                 if metadata.get('deleted', False):
750                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
751                 else:
752                     d.addCallback(lambda ign: file_node.download_best_version())
753                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
754                                                                                is_conflict=is_conflict))
755
756         d.addCallbacks(do_update_db, failed)
757
758         def trap_conflicts(f):
759             f.trap(ConflictError)
760             return None
761         d.addErrback(trap_conflicts)
762         return d