]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
be468ea8874d12a73939a22ae2c29a2e3b147fd4
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 def is_new_file(pathinfo, db_entry):
46     if db_entry is None:
47         return True
48
49     if not pathinfo.exists and db_entry.size is None:
50         return False
51
52     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
53             (db_entry.size, db_entry.ctime, db_entry.mtime))
54
55
56 class MagicFolder(service.MultiService):
57     name = 'magic-folder'
58
59     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
60                  pending_delay=1.0, clock=None):
61         precondition_abspath(local_path_u)
62
63         service.MultiService.__init__(self)
64
65         immediate = clock is not None
66         clock = clock or reactor
67         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
68         if db is None:
69             return Failure(Exception('ERROR: Unable to load magic folder db.'))
70
71         # for tests
72         self._client = client
73         self._db = db
74
75         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
76         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
77
78         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
79         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
80                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
81
82     def startService(self):
83         # TODO: why is this being called more than once?
84         if self.running:
85             return defer.succeed(None)
86         print "%r.startService" % (self,)
87         service.MultiService.startService(self)
88         return self.uploader.start_monitoring()
89
90     def ready(self):
91         """ready is used to signal us to start
92         processing the upload and download items...
93         """
94         self.uploader.start_uploading()  # synchronous
95         return self.downloader.start_downloading()
96
97     def finish(self):
98         print "finish"
99         d = self.uploader.stop()
100         d2 = self.downloader.stop()
101         d.addCallback(lambda ign: d2)
102         return d
103
104     def remove_service(self):
105         return service.MultiService.disownServiceParent(self)
106
107
108 class QueueMixin(HookMixin):
109     def __init__(self, client, local_path_u, db, name, clock):
110         self._client = client
111         self._local_path_u = local_path_u
112         self._local_filepath = to_filepath(local_path_u)
113         self._db = db
114         self._name = name
115         self._clock = clock
116         self._hooks = {'processed': None, 'started': None}
117         self.started_d = self.set_hook('started')
118
119         if not self._local_filepath.exists():
120             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
121                                  "but there is no directory at that location."
122                                  % quote_local_unicode_path(self._local_path_u))
123         if not self._local_filepath.isdir():
124             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
125                                  "but the thing at that location is not a directory."
126                                  % quote_local_unicode_path(self._local_path_u))
127
128         self._deque = deque()
129         self._lazy_tail = defer.succeed(None)
130         self._stopped = False
131         self._turn_delay = 0
132
133     def _get_filepath(self, relpath_u):
134         self._log("_get_filepath(%r)" % (relpath_u,))
135         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
136
137     def _get_relpath(self, filepath):
138         self._log("_get_relpath(%r)" % (filepath,))
139         segments = unicode_segments_from(filepath, self._local_filepath)
140         self._log("segments = %r" % (segments,))
141         return u"/".join(segments)
142
143     def _count(self, counter_name, delta=1):
144         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
145         self._log("%s += %r" % (counter_name, delta))
146         self._client.stats_provider.count(ctr, delta)
147
148     def _logcb(self, res, msg):
149         self._log("%s: %r" % (msg, res))
150         return res
151
152     def _log(self, msg):
153         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
154         self._client.log(s)
155         print s
156         #open("events", "ab+").write(msg)
157
158     def _turn_deque(self):
159         self._log("_turn_deque")
160         if self._stopped:
161             self._log("stopped")
162             return
163         try:
164             item = self._deque.pop()
165             self._log("popped %r" % (item,))
166             self._count('objects_queued', -1)
167         except IndexError:
168             self._log("deque is now empty")
169             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
170         else:
171             self._lazy_tail.addCallback(lambda ign: self._process(item))
172             self._lazy_tail.addBoth(self._call_hook, 'processed')
173             self._lazy_tail.addErrback(log.err)
174             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
175
176
177 class Uploader(QueueMixin):
178     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
179                  immediate=False):
180         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
181
182         self.is_ready = False
183         self._immediate = immediate
184
185         if not IDirectoryNode.providedBy(upload_dirnode):
186             raise AssertionError("The URI in '%s' does not refer to a directory."
187                                  % os.path.join('private', 'magic_folder_dircap'))
188         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
189             raise AssertionError("The URI in '%s' is not a writecap to a directory."
190                                  % os.path.join('private', 'magic_folder_dircap'))
191
192         self._upload_dirnode = upload_dirnode
193         self._inotify = get_inotify_module()
194         self._notifier = self._inotify.INotify()
195         self._pending = set()  # of unicode relpaths
196
197         self._periodic_full_scan_duration = 10 * 60 # perform a full scan every 10 minutes
198
199         if hasattr(self._notifier, 'set_pending_delay'):
200             self._notifier.set_pending_delay(pending_delay)
201
202         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
203         #
204         self.mask = ( self._inotify.IN_CREATE
205                     | self._inotify.IN_CLOSE_WRITE
206                     | self._inotify.IN_MOVED_TO
207                     | self._inotify.IN_MOVED_FROM
208                     | self._inotify.IN_DELETE
209                     | self._inotify.IN_ONLYDIR
210                     | IN_EXCL_UNLINK
211                     )
212         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
213                              recursive=True)
214
215     def start_monitoring(self):
216         self._log("start_monitoring")
217         d = defer.succeed(None)
218         d.addCallback(lambda ign: self._notifier.startReading())
219         d.addCallback(lambda ign: self._count('dirs_monitored'))
220         d.addBoth(self._call_hook, 'started')
221         return d
222
223     def stop(self):
224         self._log("stop")
225         self._notifier.stopReading()
226         self._count('dirs_monitored', -1)
227         self.periodic_callid.cancel()
228         if hasattr(self._notifier, 'wait_until_stopped'):
229             d = self._notifier.wait_until_stopped()
230         else:
231             d = defer.succeed(None)
232         d.addCallback(lambda ign: self._lazy_tail)
233         return d
234
235     def start_uploading(self):
236         self._log("start_uploading")
237         self.is_ready = True
238
239         all_relpaths = self._db.get_all_relpaths()
240         self._log("all relpaths: %r" % (all_relpaths,))
241
242         for relpath_u in all_relpaths:
243             self._add_pending(relpath_u)
244
245         self._periodic_full_scan(ignore_pending=True)
246         self._extend_queue_and_keep_going(self._pending)
247
248     def _extend_queue_and_keep_going(self, relpaths_u):
249         self._log("queueing %r" % (relpaths_u,))
250         self._deque.extend(relpaths_u)
251         self._count('objects_queued', len(relpaths_u))
252
253         if self.is_ready:
254             if self._immediate:  # for tests
255                 self._turn_deque()
256             else:
257                 self._clock.callLater(0, self._turn_deque)
258
259     def _periodic_full_scan(self, ignore_pending=False):
260         self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._periodic_full_scan)
261         if ignore_pending:
262             self._full_scan()
263         else:
264             if len(self._pending) == 0:
265                 self._full_scan()
266
267     def _full_scan(self):
268         print "FULL SCAN"
269         self._log("_pending %r" % (self._pending))
270         self._scan(u"")
271
272     def _add_pending(self, relpath_u):
273         if not magicpath.should_ignore_file(relpath_u):
274             self._pending.add(relpath_u)
275
276     def _scan(self, reldir_u):
277         # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
278         # Note that this doesn't add them to the deque -- that will
279
280         self._log("scan %r" % (reldir_u,))
281         fp = self._get_filepath(reldir_u)
282         try:
283             children = listdir_filepath(fp)
284         except EnvironmentError:
285             raise Exception("WARNING: magic folder: permission denied on directory %s"
286                             % quote_filepath(fp))
287         except FilenameEncodingError:
288             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
289                             % quote_filepath(fp))
290
291         for child in children:
292             _assert(isinstance(child, unicode), child=child)
293             self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
294
295     def is_pending(self, relpath_u):
296         return relpath_u in self._pending
297
298     def _notify(self, opaque, path, events_mask):
299         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
300         relpath_u = self._get_relpath(path)
301
302         # We filter out IN_CREATE events not associated with a directory.
303         # Acting on IN_CREATE for files could cause us to read and upload
304         # a possibly-incomplete file before the application has closed it.
305         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
306         # It isn't possible to avoid watching for IN_CREATE at all, because
307         # it is the only event notified for a directory creation.
308
309         if ((events_mask & self._inotify.IN_CREATE) != 0 and
310             (events_mask & self._inotify.IN_ISDIR) == 0):
311             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
312             return
313         if relpath_u in self._pending:
314             self._log("not queueing %r because it is already pending" % (relpath_u,))
315             return
316         if magicpath.should_ignore_file(relpath_u):
317             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
318             return
319
320         self._pending.add(relpath_u)
321         self._extend_queue_and_keep_going([relpath_u])
322
323     def _when_queue_is_empty(self):
324         return defer.succeed(None)
325
326     def _process(self, relpath_u):
327         # Uploader
328         self._log("_process(%r)" % (relpath_u,))
329         if relpath_u is None:
330             return
331         precondition(isinstance(relpath_u, unicode), relpath_u)
332         precondition(not relpath_u.endswith(u'/'), relpath_u)
333
334         d = defer.succeed(None)
335
336         def _maybe_upload(val, now=None):
337             if now is None:
338                 now = time.time()
339             fp = self._get_filepath(relpath_u)
340             pathinfo = get_pathinfo(unicode_from_filepath(fp))
341
342             self._log("about to remove %r from pending set %r" %
343                       (relpath_u, self._pending))
344             self._pending.remove(relpath_u)
345             encoded_path_u = magicpath.path2magic(relpath_u)
346
347             if not pathinfo.exists:
348                 # FIXME merge this with the 'isfile' case.
349                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
350                 self._count('objects_disappeared')
351
352                 db_entry = self._db.get_db_entry(relpath_u)
353                 if db_entry is None:
354                     return None
355
356                 last_downloaded_timestamp = now  # is this correct?
357
358                 if is_new_file(pathinfo, db_entry):
359                     new_version = db_entry.version + 1
360                 else:
361                     self._log("Not uploading %r" % (relpath_u,))
362                     self._count('objects_not_uploaded')
363                     return
364
365                 metadata = { 'version': new_version,
366                              'deleted': True,
367                              'last_downloaded_timestamp': last_downloaded_timestamp }
368                 if db_entry.last_downloaded_uri is not None:
369                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
370
371                 empty_uploadable = Data("", self._client.convergence)
372                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
373                                                    metadata=metadata, overwrite=True)
374
375                 def _add_db_entry(filenode):
376                     filecap = filenode.get_uri()
377                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
378                     self._db.did_upload_version(relpath_u, new_version, filecap,
379                                                 last_downloaded_uri, last_downloaded_timestamp,
380                                                 pathinfo)
381                     self._count('files_uploaded')
382                 d2.addCallback(_add_db_entry)
383                 return d2
384             elif pathinfo.islink:
385                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
386                 return None
387             elif pathinfo.isdir:
388                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
389                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
390
391                 uploadable = Data("", self._client.convergence)
392                 encoded_path_u += magicpath.path2magic(u"/")
393                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
394                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
395                 def _dir_succeeded(ign):
396                     self._log("created subdirectory %r" % (relpath_u,))
397                     self._count('directories_created')
398                 def _dir_failed(f):
399                     self._log("failed to create subdirectory %r" % (relpath_u,))
400                     return f
401                 upload_d.addCallbacks(_dir_succeeded, _dir_failed)
402                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
403                 upload_d.addCallback(lambda ign: self._extend_queue_and_keep_going(self._pending))
404                 return upload_d
405             elif pathinfo.isfile:
406                 db_entry = self._db.get_db_entry(relpath_u)
407
408                 last_downloaded_timestamp = now
409
410                 if db_entry is None:
411                     new_version = 0
412                 elif is_new_file(pathinfo, db_entry):
413                     new_version = db_entry.version + 1
414                 else:
415                     self._log("Not uploading %r" % (relpath_u,))
416                     self._count('objects_not_uploaded')
417                     return None
418
419                 metadata = { 'version': new_version,
420                              'last_downloaded_timestamp': last_downloaded_timestamp }
421                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
422                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
423
424                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
425                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
426                                                    metadata=metadata, overwrite=True)
427
428                 def _add_db_entry(filenode):
429                     filecap = filenode.get_uri()
430                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
431                     self._db.did_upload_version(relpath_u, new_version, filecap,
432                                                 last_downloaded_uri, last_downloaded_timestamp,
433                                                 pathinfo)
434                     self._count('files_uploaded')
435                 d2.addCallback(_add_db_entry)
436                 return d2
437             else:
438                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
439                 return None
440
441         d.addCallback(_maybe_upload)
442
443         def _succeeded(res):
444             self._count('objects_succeeded')
445             return res
446         def _failed(f):
447             self._count('objects_failed')
448             self._log("%s while processing %r" % (f, relpath_u))
449             return f
450         d.addCallbacks(_succeeded, _failed)
451         return d
452
453     def _get_metadata(self, encoded_path_u):
454         try:
455             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
456         except KeyError:
457             return Failure()
458         return d
459
460     def _get_filenode(self, encoded_path_u):
461         try:
462             d = self._upload_dirnode.get(encoded_path_u)
463         except KeyError:
464             return Failure()
465         return d
466
467
468 class WriteFileMixin(object):
469     FUDGE_SECONDS = 10.0
470
471     def _get_conflicted_filename(self, abspath_u):
472         return abspath_u + u".conflict"
473
474     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
475         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
476                   % (abspath_u, len(file_contents), is_conflict, now))
477
478         # 1. Write a temporary file, say .foo.tmp.
479         # 2. is_conflict determines whether this is an overwrite or a conflict.
480         # 3. Set the mtime of the replacement file to be T seconds before the
481         #    current local time.
482         # 4. Perform a file replacement with backup filename foo.backup,
483         #    replaced file foo, and replacement file .foo.tmp. If any step of
484         #    this operation fails, reclassify as a conflict and stop.
485         #
486         # Returns the path of the destination file.
487
488         precondition_abspath(abspath_u)
489         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
490         backup_path_u = abspath_u + u".backup"
491         if now is None:
492             now = time.time()
493
494         # ensure parent directory exists
495         head, tail = os.path.split(abspath_u)
496
497         old_mask = os.umask(self._umask)
498         try:
499             fileutil.make_dirs(head, (~ self._umask) & 0777)
500             fileutil.write(replacement_path_u, file_contents)
501         finally:
502             os.umask(old_mask)
503
504         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
505         if is_conflict:
506             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
507             return self._rename_conflicted_file(abspath_u, replacement_path_u)
508         else:
509             try:
510                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
511                 return abspath_u
512             except fileutil.ConflictError:
513                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
514
515     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
516         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
517
518         conflict_path_u = self._get_conflicted_filename(abspath_u)
519         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
520         if os.path.isfile(replacement_path_u):
521             print "%r exists" % (replacement_path_u,)
522         if os.path.isfile(conflict_path_u):
523             print "%r exists" % (conflict_path_u,)
524
525         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
526         return conflict_path_u
527
528     def _rename_deleted_file(self, abspath_u):
529         self._log('renaming deleted file to backup: %s' % (abspath_u,))
530         try:
531             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
532         except OSError:
533             self._log("Already gone: '%s'" % (abspath_u,))
534         return abspath_u
535
536
537 class Downloader(QueueMixin, WriteFileMixin):
538     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
539
540     def __init__(self, client, local_path_u, db, collective_dirnode,
541                  upload_readonly_dircap, clock, is_upload_pending, umask):
542         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
543
544         if not IDirectoryNode.providedBy(collective_dirnode):
545             raise AssertionError("The URI in '%s' does not refer to a directory."
546                                  % os.path.join('private', 'collective_dircap'))
547         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
548             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
549                                  % os.path.join('private', 'collective_dircap'))
550
551         self._collective_dirnode = collective_dirnode
552         self._upload_readonly_dircap = upload_readonly_dircap
553         self._is_upload_pending = is_upload_pending
554         self._umask = umask
555
556     def start_downloading(self):
557         self._log("start_downloading")
558         files = self._db.get_all_relpaths()
559         self._log("all files %s" % files)
560
561         d = self._scan_remote_collective(scan_self=True)
562         d.addBoth(self._logcb, "after _scan_remote_collective 0")
563         self._turn_deque()
564         return d
565
566     def stop(self):
567         self._stopped = True
568         d = defer.succeed(None)
569         d.addCallback(lambda ign: self._lazy_tail)
570         return d
571
572     def _should_download(self, relpath_u, remote_version):
573         """
574         _should_download returns a bool indicating whether or not a remote object should be downloaded.
575         We check the remote metadata version against our magic-folder db version number;
576         latest version wins.
577         """
578         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
579         if magicpath.should_ignore_file(relpath_u):
580             self._log("nope")
581             return False
582         self._log("yep")
583         db_entry = self._db.get_db_entry(relpath_u)
584         if db_entry is None:
585             return True
586         self._log("version %r" % (db_entry.version,))
587         return (db_entry.version < remote_version)
588
589     def _get_local_latest(self, relpath_u):
590         """
591         _get_local_latest takes a unicode path string checks to see if this file object
592         exists in our magic-folder db; if not then return None
593         else check for an entry in our magic-folder db and return the version number.
594         """
595         if not self._get_filepath(relpath_u).exists():
596             return None
597         db_entry = self._db.get_db_entry(relpath_u)
598         return None if db_entry is None else db_entry.version
599
600     def _get_collective_latest_file(self, filename):
601         """
602         _get_collective_latest_file takes a file path pointing to a file managed by
603         magic-folder and returns a deferred that fires with the two tuple containing a
604         file node and metadata for the latest version of the file located in the
605         magic-folder collective directory.
606         """
607         collective_dirmap_d = self._collective_dirnode.list()
608         def scan_collective(result):
609             list_of_deferreds = []
610             for dir_name in result.keys():
611                 # XXX make sure it's a directory
612                 d = defer.succeed(None)
613                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
614                 list_of_deferreds.append(d)
615             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
616             return deferList
617         collective_dirmap_d.addCallback(scan_collective)
618         def highest_version(deferredList):
619             max_version = 0
620             metadata = None
621             node = None
622             for success, result in deferredList:
623                 if success:
624                     if result[1]['version'] > max_version:
625                         node, metadata = result
626                         max_version = result[1]['version']
627             return node, metadata
628         collective_dirmap_d.addCallback(highest_version)
629         return collective_dirmap_d
630
631     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
632         self._log("_scan_remote_dmd nickname %r" % (nickname,))
633         d = dirnode.list()
634         def scan_listing(listing_map):
635             for encoded_relpath_u in listing_map.keys():
636                 relpath_u = magicpath.magic2path(encoded_relpath_u)
637                 self._log("found %r" % (relpath_u,))
638
639                 file_node, metadata = listing_map[encoded_relpath_u]
640                 local_version = self._get_local_latest(relpath_u)
641                 remote_version = metadata.get('version', None)
642                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
643
644                 if local_version is None or remote_version is None or local_version < remote_version:
645                     self._log("%r added to download queue" % (relpath_u,))
646                     if scan_batch.has_key(relpath_u):
647                         scan_batch[relpath_u] += [(file_node, metadata)]
648                     else:
649                         scan_batch[relpath_u] = [(file_node, metadata)]
650
651         d.addCallback(scan_listing)
652         d.addBoth(self._logcb, "end of _scan_remote_dmd")
653         return d
654
655     def _scan_remote_collective(self, scan_self=False):
656         self._log("_scan_remote_collective")
657         scan_batch = {}  # path -> [(filenode, metadata)]
658
659         d = self._collective_dirnode.list()
660         def scan_collective(dirmap):
661             d2 = defer.succeed(None)
662             for dir_name in dirmap:
663                 (dirnode, metadata) = dirmap[dir_name]
664                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
665                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
666                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
667                     def _err(f, dir_name=dir_name):
668                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
669                         # XXX what should we do to make this failure more visible to users?
670                     d2.addErrback(_err)
671
672             return d2
673         d.addCallback(scan_collective)
674
675         def _filter_batch_to_deque(ign):
676             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
677             for relpath_u in scan_batch.keys():
678                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
679
680                 if self._should_download(relpath_u, metadata['version']):
681                     self._deque.append( (relpath_u, file_node, metadata) )
682                 else:
683                     self._log("Excluding %r" % (relpath_u,))
684                     self._call_hook(None, 'processed')
685
686             self._log("deque after = %r" % (self._deque,))
687         d.addCallback(_filter_batch_to_deque)
688         return d
689
690     def _when_queue_is_empty(self):
691         d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
692         d.addBoth(self._logcb, "after _scan_remote_collective 1")
693         d.addCallback(lambda ign: self._turn_deque())
694         return d
695
696     def _process(self, item, now=None):
697         # Downloader
698         self._log("_process(%r)" % (item,))
699         if now is None:
700             now = time.time()
701         (relpath_u, file_node, metadata) = item
702         fp = self._get_filepath(relpath_u)
703         abspath_u = unicode_from_filepath(fp)
704         conflict_path_u = self._get_conflicted_filename(abspath_u)
705
706         d = defer.succeed(None)
707
708         def do_update_db(written_abspath_u):
709             filecap = file_node.get_uri()
710             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
711             last_downloaded_uri = filecap
712             last_downloaded_timestamp = now
713             written_pathinfo = get_pathinfo(written_abspath_u)
714
715             if not written_pathinfo.exists and not metadata.get('deleted', False):
716                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
717
718             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
719                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
720             self._count('objects_downloaded')
721         def failed(f):
722             self._log("download failed: %s" % (str(f),))
723             self._count('objects_failed')
724             return f
725
726         if os.path.isfile(conflict_path_u):
727             def fail(res):
728                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
729             d.addCallback(fail)
730         else:
731             is_conflict = False
732             db_entry = self._db.get_db_entry(relpath_u)
733             dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
734             dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
735             if db_entry:
736                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
737                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
738                         is_conflict = True
739                         self._count('objects_conflicted')
740                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
741                     is_conflict = True
742                     self._count('objects_conflicted')
743                 elif self._is_upload_pending(relpath_u):
744                     is_conflict = True
745                     self._count('objects_conflicted')
746
747             if relpath_u.endswith(u"/"):
748                 if metadata.get('deleted', False):
749                     self._log("rmdir(%r) ignored" % (abspath_u,))
750                 else:
751                     self._log("mkdir(%r)" % (abspath_u,))
752                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
753                     d.addCallback(lambda ign: abspath_u)
754             else:
755                 if metadata.get('deleted', False):
756                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
757                 else:
758                     d.addCallback(lambda ign: file_node.download_best_version())
759                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
760                                                                                is_conflict=is_conflict))
761
762         d.addCallbacks(do_update_db, failed)
763
764         def trap_conflicts(f):
765             f.trap(ConflictError)
766             return None
767         d.addErrback(trap_conflicts)
768         return d