]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
05bc4345fdc001af9f685b2cddf914d268b2c8b9
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24 defer.setDebugging(True)
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 def is_new_file(pathinfo, db_entry):
46     if db_entry is None:
47         return True
48
49     if not pathinfo.exists and db_entry.size is None:
50         return False
51
52     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
53             (db_entry.size, db_entry.ctime, db_entry.mtime))
54
55
56 class MagicFolder(service.MultiService):
57     name = 'magic-folder'
58
59     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
60                  pending_delay=1.0, clock=None):
61         precondition_abspath(local_path_u)
62
63         service.MultiService.__init__(self)
64
65         immediate = clock is not None
66         clock = clock or reactor
67         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
68         if db is None:
69             return Failure(Exception('ERROR: Unable to load magic folder db.'))
70
71         # for tests
72         self._client = client
73         self._db = db
74
75         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
76         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
77
78         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
79         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
80                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
81
82     def startService(self):
83         # TODO: why is this being called more than once?
84         if self.running:
85             return defer.succeed(None)
86         print "%r.startService" % (self,)
87         service.MultiService.startService(self)
88         return self.uploader.start_monitoring()
89
90     def ready(self):
91         """ready is used to signal us to start
92         processing the upload and download items...
93         """
94         self.uploader.start_uploading()  # synchronous
95         return self.downloader.start_downloading()
96
97     def finish(self):
98         print "finish"
99         d = self.uploader.stop()
100         d2 = self.downloader.stop()
101         d.addCallback(lambda ign: d2)
102         return d
103
104     def remove_service(self):
105         return service.MultiService.disownServiceParent(self)
106
107
108 class QueueMixin(HookMixin):
109     def __init__(self, client, local_path_u, db, name, clock):
110         self._client = client
111         self._local_path_u = local_path_u
112         self._local_filepath = to_filepath(local_path_u)
113         self._db = db
114         self._name = name
115         self._clock = clock
116         self._hooks = {'processed': None, 'started': None}
117         self.started_d = self.set_hook('started')
118
119         if not self._local_filepath.exists():
120             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
121                                  "but there is no directory at that location."
122                                  % quote_local_unicode_path(self._local_path_u))
123         if not self._local_filepath.isdir():
124             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
125                                  "but the thing at that location is not a directory."
126                                  % quote_local_unicode_path(self._local_path_u))
127
128         self._deque = deque()
129         self._lazy_tail = defer.succeed(None)
130         self._stopped = False
131         self._turn_delay = 0
132
133     def _get_filepath(self, relpath_u):
134         self._log("_get_filepath(%r)" % (relpath_u,))
135         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
136
137     def _get_relpath(self, filepath):
138         self._log("_get_relpath(%r)" % (filepath,))
139         segments = unicode_segments_from(filepath, self._local_filepath)
140         self._log("segments = %r" % (segments,))
141         return u"/".join(segments)
142
143     def _count(self, counter_name, delta=1):
144         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
145         self._log("%s += %r" % (counter_name, delta))
146         self._client.stats_provider.count(ctr, delta)
147
148     def _logcb(self, res, msg):
149         self._log("%s: %r" % (msg, res))
150         return res
151
152     def _log(self, msg):
153         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
154         self._client.log(s)
155         print s
156         #open("events", "ab+").write(msg)
157
158     def _turn_deque(self):
159         try:
160             self._log("_turn_deque")
161             if self._stopped:
162                 self._log("stopped")
163                 return
164             try:
165                 item = self._deque.pop()
166                 self._log("popped %r" % (item,))
167                 self._count('objects_queued', -1)
168             except IndexError:
169                 self._log("deque is now empty")
170                 self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
171             else:
172                 self._log("_turn_deque else clause")
173                 def whawhat(result):
174                     self._log("whawhat result %r" % (result,))
175                     return result
176                 self._lazy_tail.addBoth(whawhat)
177                 self._lazy_tail.addCallback(lambda ign: self._process(item))
178                 self._lazy_tail.addBoth(self._call_hook, 'processed')
179                 self._lazy_tail.addErrback(log.err)
180                 self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
181         except Exception as e:
182             self._log("turn deque exception %s" % (e,))
183             raise
184
185
186 class Uploader(QueueMixin):
187     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
188                  immediate=False):
189         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
190
191         self.is_ready = False
192         self._immediate = immediate
193
194         if not IDirectoryNode.providedBy(upload_dirnode):
195             raise AssertionError("The URI in '%s' does not refer to a directory."
196                                  % os.path.join('private', 'magic_folder_dircap'))
197         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
198             raise AssertionError("The URI in '%s' is not a writecap to a directory."
199                                  % os.path.join('private', 'magic_folder_dircap'))
200
201         self._upload_dirnode = upload_dirnode
202         self._inotify = get_inotify_module()
203         self._notifier = self._inotify.INotify()
204         self._pending = set()  # of unicode relpaths
205
206         self._periodic_full_scan_duration = 10 * 60 # perform a full scan every 10 minutes
207
208         if hasattr(self._notifier, 'set_pending_delay'):
209             self._notifier.set_pending_delay(pending_delay)
210
211         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
212         #
213         self.mask = ( self._inotify.IN_CREATE
214                     | self._inotify.IN_CLOSE_WRITE
215                     | self._inotify.IN_MOVED_TO
216                     | self._inotify.IN_MOVED_FROM
217                     | self._inotify.IN_DELETE
218                     | self._inotify.IN_ONLYDIR
219                     | IN_EXCL_UNLINK
220                     )
221         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
222                              recursive=True)
223
224     def start_monitoring(self):
225         self._log("start_monitoring")
226         d = defer.succeed(None)
227         d.addCallback(lambda ign: self._notifier.startReading())
228         d.addCallback(lambda ign: self._count('dirs_monitored'))
229         d.addBoth(self._call_hook, 'started')
230         return d
231
232     def stop(self):
233         self._log("stop")
234         self._notifier.stopReading()
235         self._count('dirs_monitored', -1)
236         self.periodic_callid.cancel()
237         if hasattr(self._notifier, 'wait_until_stopped'):
238             d = self._notifier.wait_until_stopped()
239         else:
240             d = defer.succeed(None)
241         d.addCallback(lambda ign: self._lazy_tail)
242         return d
243
244     def start_uploading(self):
245         self._log("start_uploading")
246         self.is_ready = True
247
248         all_relpaths = self._db.get_all_relpaths()
249         self._log("all relpaths: %r" % (all_relpaths,))
250
251         for relpath_u in all_relpaths:
252             self._add_pending(relpath_u)
253
254         self._full_scan()
255
256     def _extend_queue_and_keep_going(self, relpaths_u):
257         self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
258         self._deque.extend(relpaths_u)
259         self._count('objects_queued', len(relpaths_u))
260
261         if self.is_ready:
262             if self._immediate:  # for tests
263                 self._turn_deque()
264             else:
265                 self._clock.callLater(0, self._turn_deque)
266
267     def _full_scan(self):
268         self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan)
269         print "FULL SCAN"
270         self._log("_pending %r" % (self._pending))
271         self._scan(u"")
272         self._extend_queue_and_keep_going(self._pending)
273
274     def _add_pending(self, relpath_u):
275         self._log("add pending %r" % (relpath_u,))        
276         if not magicpath.should_ignore_file(relpath_u):
277             self._pending.add(relpath_u)
278
279     def _scan(self, reldir_u):
280         # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
281         # Note that this doesn't add them to the deque -- that will
282
283         self._log("scan %r" % (reldir_u,))
284         fp = self._get_filepath(reldir_u)
285         try:
286             children = listdir_filepath(fp)
287         except EnvironmentError:
288             raise Exception("WARNING: magic folder: permission denied on directory %s"
289                             % quote_filepath(fp))
290         except FilenameEncodingError:
291             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
292                             % quote_filepath(fp))
293
294         for child in children:
295             _assert(isinstance(child, unicode), child=child)
296             self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
297
298     def is_pending(self, relpath_u):
299         return relpath_u in self._pending
300
301     def _notify(self, opaque, path, events_mask):
302         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
303         relpath_u = self._get_relpath(path)
304
305         # We filter out IN_CREATE events not associated with a directory.
306         # Acting on IN_CREATE for files could cause us to read and upload
307         # a possibly-incomplete file before the application has closed it.
308         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
309         # It isn't possible to avoid watching for IN_CREATE at all, because
310         # it is the only event notified for a directory creation.
311
312         if ((events_mask & self._inotify.IN_CREATE) != 0 and
313             (events_mask & self._inotify.IN_ISDIR) == 0):
314             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
315             return
316         if relpath_u in self._pending:
317             self._log("not queueing %r because it is already pending" % (relpath_u,))
318             return
319         if magicpath.should_ignore_file(relpath_u):
320             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
321             return
322
323         self._pending.add(relpath_u)
324         self._extend_queue_and_keep_going([relpath_u])
325
326     def _when_queue_is_empty(self):
327         return defer.succeed(None)
328
329     def _process(self, relpath_u):
330         # Uploader
331         self._log("_process(%r)" % (relpath_u,))
332         if relpath_u is None:
333             return
334         precondition(isinstance(relpath_u, unicode), relpath_u)
335         precondition(not relpath_u.endswith(u'/'), relpath_u)
336
337         d = defer.succeed(None)
338
339         def _maybe_upload(val, now=None):
340             self._log("_maybe_upload(%r, now=%r)" % (val, now))
341             if now is None:
342                 now = time.time()
343             fp = self._get_filepath(relpath_u)
344             pathinfo = get_pathinfo(unicode_from_filepath(fp))
345
346             self._log("about to remove %r from pending set %r" %
347                       (relpath_u, self._pending))
348             self._pending.remove(relpath_u)
349             encoded_path_u = magicpath.path2magic(relpath_u)
350
351             if not pathinfo.exists:
352                 # FIXME merge this with the 'isfile' case.
353                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
354                 self._count('objects_disappeared')
355
356                 db_entry = self._db.get_db_entry(relpath_u)
357                 if db_entry is None:
358                     return None
359
360                 last_downloaded_timestamp = now  # is this correct?
361
362                 if is_new_file(pathinfo, db_entry):
363                     new_version = db_entry.version + 1
364                 else:
365                     self._log("Not uploading %r" % (relpath_u,))
366                     self._count('objects_not_uploaded')
367                     return
368
369                 metadata = { 'version': new_version,
370                              'deleted': True,
371                              'last_downloaded_timestamp': last_downloaded_timestamp }
372                 if db_entry.last_downloaded_uri is not None:
373                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
374
375                 empty_uploadable = Data("", self._client.convergence)
376                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
377                                                    metadata=metadata, overwrite=True)
378
379                 def _add_db_entry(filenode):
380                     filecap = filenode.get_uri()
381                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
382                     self._db.did_upload_version(relpath_u, new_version, filecap,
383                                                 last_downloaded_uri, last_downloaded_timestamp,
384                                                 pathinfo)
385                     self._count('files_uploaded')
386                 d2.addCallback(_add_db_entry)
387                 return d2
388             elif pathinfo.islink:
389                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
390                 return None
391             elif pathinfo.isdir:
392                 print "ISDIR "
393                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
394                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
395
396                 uploadable = Data("", self._client.convergence)
397                 encoded_path_u += magicpath.path2magic(u"/")
398                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
399                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
400                 def _dir_succeeded(ign):
401                     self._log("created subdirectory %r" % (relpath_u,))
402                     self._count('directories_created')
403                 def _dir_failed(f):
404                     self._log("failed to create subdirectory %r" % (relpath_u,))
405                     return f
406                 upload_d.addCallbacks(_dir_succeeded, _dir_failed)
407                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
408                 upload_d.addCallback(lambda ign: self._extend_queue_and_keep_going(self._pending))
409                 return upload_d
410             elif pathinfo.isfile:
411                 db_entry = self._db.get_db_entry(relpath_u)
412
413                 last_downloaded_timestamp = now
414
415                 if db_entry is None:
416                     new_version = 0
417                 elif is_new_file(pathinfo, db_entry):
418                     new_version = db_entry.version + 1
419                 else:
420                     self._log("Not uploading %r" % (relpath_u,))
421                     self._count('objects_not_uploaded')
422                     return None
423
424                 metadata = { 'version': new_version,
425                              'last_downloaded_timestamp': last_downloaded_timestamp }
426                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
427                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
428
429                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
430                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
431                                                    metadata=metadata, overwrite=True)
432
433                 def _add_db_entry(filenode):
434                     filecap = filenode.get_uri()
435                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
436                     self._db.did_upload_version(relpath_u, new_version, filecap,
437                                                 last_downloaded_uri, last_downloaded_timestamp,
438                                                 pathinfo)
439                     self._count('files_uploaded')
440                 d2.addCallback(_add_db_entry)
441                 return d2
442             else:
443                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
444                 return None
445
446         d.addCallback(_maybe_upload)
447
448         def _succeeded(res):
449             self._count('objects_succeeded')
450             return res
451         def _failed(f):
452             self._count('objects_failed')
453             self._log("%s while processing %r" % (f, relpath_u))
454             return f
455         d.addCallbacks(_succeeded, _failed)
456         return d
457
458     def _get_metadata(self, encoded_path_u):
459         try:
460             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
461         except KeyError:
462             return Failure()
463         return d
464
465     def _get_filenode(self, encoded_path_u):
466         try:
467             d = self._upload_dirnode.get(encoded_path_u)
468         except KeyError:
469             return Failure()
470         return d
471
472
473 class WriteFileMixin(object):
474     FUDGE_SECONDS = 10.0
475
476     def _get_conflicted_filename(self, abspath_u):
477         return abspath_u + u".conflict"
478
479     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
480         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
481                   % (abspath_u, len(file_contents), is_conflict, now))
482
483         # 1. Write a temporary file, say .foo.tmp.
484         # 2. is_conflict determines whether this is an overwrite or a conflict.
485         # 3. Set the mtime of the replacement file to be T seconds before the
486         #    current local time.
487         # 4. Perform a file replacement with backup filename foo.backup,
488         #    replaced file foo, and replacement file .foo.tmp. If any step of
489         #    this operation fails, reclassify as a conflict and stop.
490         #
491         # Returns the path of the destination file.
492
493         precondition_abspath(abspath_u)
494         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
495         backup_path_u = abspath_u + u".backup"
496         if now is None:
497             now = time.time()
498
499         # ensure parent directory exists
500         head, tail = os.path.split(abspath_u)
501
502         old_mask = os.umask(self._umask)
503         try:
504             fileutil.make_dirs(head, (~ self._umask) & 0777)
505             fileutil.write(replacement_path_u, file_contents)
506         finally:
507             os.umask(old_mask)
508
509         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
510         if is_conflict:
511             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
512             return self._rename_conflicted_file(abspath_u, replacement_path_u)
513         else:
514             try:
515                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
516                 return abspath_u
517             except fileutil.ConflictError:
518                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
519
520     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
521         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
522
523         conflict_path_u = self._get_conflicted_filename(abspath_u)
524         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
525         if os.path.isfile(replacement_path_u):
526             print "%r exists" % (replacement_path_u,)
527         if os.path.isfile(conflict_path_u):
528             print "%r exists" % (conflict_path_u,)
529
530         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
531         return conflict_path_u
532
533     def _rename_deleted_file(self, abspath_u):
534         self._log('renaming deleted file to backup: %s' % (abspath_u,))
535         try:
536             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
537         except OSError:
538             self._log("Already gone: '%s'" % (abspath_u,))
539         return abspath_u
540
541
542 class Downloader(QueueMixin, WriteFileMixin):
543     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
544
545     def __init__(self, client, local_path_u, db, collective_dirnode,
546                  upload_readonly_dircap, clock, is_upload_pending, umask):
547         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
548
549         if not IDirectoryNode.providedBy(collective_dirnode):
550             raise AssertionError("The URI in '%s' does not refer to a directory."
551                                  % os.path.join('private', 'collective_dircap'))
552         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
553             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
554                                  % os.path.join('private', 'collective_dircap'))
555
556         self._collective_dirnode = collective_dirnode
557         self._upload_readonly_dircap = upload_readonly_dircap
558         self._is_upload_pending = is_upload_pending
559         self._umask = umask
560
561     def start_downloading(self):
562         self._log("start_downloading")
563         files = self._db.get_all_relpaths()
564         self._log("all files %s" % files)
565
566         d = self._scan_remote_collective(scan_self=True)
567         d.addBoth(self._logcb, "after _scan_remote_collective 0")
568         self._turn_deque()
569         return d
570
571     def stop(self):
572         self._stopped = True
573         d = defer.succeed(None)
574         d.addCallback(lambda ign: self._lazy_tail)
575         return d
576
577     def _should_download(self, relpath_u, remote_version):
578         """
579         _should_download returns a bool indicating whether or not a remote object should be downloaded.
580         We check the remote metadata version against our magic-folder db version number;
581         latest version wins.
582         """
583         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
584         if magicpath.should_ignore_file(relpath_u):
585             self._log("nope")
586             return False
587         self._log("yep")
588         db_entry = self._db.get_db_entry(relpath_u)
589         if db_entry is None:
590             return True
591         self._log("version %r" % (db_entry.version,))
592         return (db_entry.version < remote_version)
593
594     def _get_local_latest(self, relpath_u):
595         """
596         _get_local_latest takes a unicode path string checks to see if this file object
597         exists in our magic-folder db; if not then return None
598         else check for an entry in our magic-folder db and return the version number.
599         """
600         if not self._get_filepath(relpath_u).exists():
601             return None
602         db_entry = self._db.get_db_entry(relpath_u)
603         return None if db_entry is None else db_entry.version
604
605     def _get_collective_latest_file(self, filename):
606         """
607         _get_collective_latest_file takes a file path pointing to a file managed by
608         magic-folder and returns a deferred that fires with the two tuple containing a
609         file node and metadata for the latest version of the file located in the
610         magic-folder collective directory.
611         """
612         collective_dirmap_d = self._collective_dirnode.list()
613         def scan_collective(result):
614             list_of_deferreds = []
615             for dir_name in result.keys():
616                 # XXX make sure it's a directory
617                 d = defer.succeed(None)
618                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
619                 list_of_deferreds.append(d)
620             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
621             return deferList
622         collective_dirmap_d.addCallback(scan_collective)
623         def highest_version(deferredList):
624             max_version = 0
625             metadata = None
626             node = None
627             for success, result in deferredList:
628                 if success:
629                     if result[1]['version'] > max_version:
630                         node, metadata = result
631                         max_version = result[1]['version']
632             return node, metadata
633         collective_dirmap_d.addCallback(highest_version)
634         return collective_dirmap_d
635
636     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
637         self._log("_scan_remote_dmd nickname %r" % (nickname,))
638         d = dirnode.list()
639         def scan_listing(listing_map):
640             for encoded_relpath_u in listing_map.keys():
641                 relpath_u = magicpath.magic2path(encoded_relpath_u)
642                 self._log("found %r" % (relpath_u,))
643
644                 file_node, metadata = listing_map[encoded_relpath_u]
645                 local_version = self._get_local_latest(relpath_u)
646                 remote_version = metadata.get('version', None)
647                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
648
649                 if local_version is None or remote_version is None or local_version < remote_version:
650                     self._log("%r added to download queue" % (relpath_u,))
651                     if scan_batch.has_key(relpath_u):
652                         scan_batch[relpath_u] += [(file_node, metadata)]
653                     else:
654                         scan_batch[relpath_u] = [(file_node, metadata)]
655
656         d.addCallback(scan_listing)
657         d.addBoth(self._logcb, "end of _scan_remote_dmd")
658         return d
659
660     def _scan_remote_collective(self, scan_self=False):
661         self._log("_scan_remote_collective")
662         scan_batch = {}  # path -> [(filenode, metadata)]
663
664         d = self._collective_dirnode.list()
665         def scan_collective(dirmap):
666             d2 = defer.succeed(None)
667             for dir_name in dirmap:
668                 (dirnode, metadata) = dirmap[dir_name]
669                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
670                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
671                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
672                     def _err(f, dir_name=dir_name):
673                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
674                         # XXX what should we do to make this failure more visible to users?
675                     d2.addErrback(_err)
676
677             return d2
678         d.addCallback(scan_collective)
679
680         def _filter_batch_to_deque(ign):
681             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
682             for relpath_u in scan_batch.keys():
683                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
684
685                 if self._should_download(relpath_u, metadata['version']):
686                     self._deque.append( (relpath_u, file_node, metadata) )
687                 else:
688                     self._log("Excluding %r" % (relpath_u,))
689                     self._call_hook(None, 'processed')
690
691             self._log("deque after = %r" % (self._deque,))
692         d.addCallback(_filter_batch_to_deque)
693         return d
694
695     def _when_queue_is_empty(self):
696         d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
697         d.addBoth(self._logcb, "after _scan_remote_collective 1")
698         d.addCallback(lambda ign: self._turn_deque())
699         return d
700
701     def _process(self, item, now=None):
702         # Downloader
703         self._log("_process(%r)" % (item,))
704         if now is None:
705             now = time.time()
706         (relpath_u, file_node, metadata) = item
707         fp = self._get_filepath(relpath_u)
708         abspath_u = unicode_from_filepath(fp)
709         conflict_path_u = self._get_conflicted_filename(abspath_u)
710
711         d = defer.succeed(None)
712
713         def do_update_db(written_abspath_u):
714             filecap = file_node.get_uri()
715             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
716             last_downloaded_uri = filecap
717             last_downloaded_timestamp = now
718             written_pathinfo = get_pathinfo(written_abspath_u)
719
720             if not written_pathinfo.exists and not metadata.get('deleted', False):
721                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
722
723             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
724                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
725             self._count('objects_downloaded')
726         def failed(f):
727             self._log("download failed: %s" % (str(f),))
728             self._count('objects_failed')
729             return f
730
731         if os.path.isfile(conflict_path_u):
732             def fail(res):
733                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
734             d.addCallback(fail)
735         else:
736             is_conflict = False
737             db_entry = self._db.get_db_entry(relpath_u)
738             dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
739             dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
740             if db_entry:
741                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
742                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
743                         is_conflict = True
744                         self._count('objects_conflicted')
745                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
746                     is_conflict = True
747                     self._count('objects_conflicted')
748                 elif self._is_upload_pending(relpath_u):
749                     is_conflict = True
750                     self._count('objects_conflicted')
751
752             if relpath_u.endswith(u"/"):
753                 if metadata.get('deleted', False):
754                     self._log("rmdir(%r) ignored" % (abspath_u,))
755                 else:
756                     self._log("mkdir(%r)" % (abspath_u,))
757                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
758                     d.addCallback(lambda ign: abspath_u)
759             else:
760                 if metadata.get('deleted', False):
761                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
762                 else:
763                     d.addCallback(lambda ign: file_node.download_best_version())
764                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
765                                                                                is_conflict=is_conflict))
766
767         d.addCallbacks(do_update_db, failed)
768
769         def trap_conflicts(f):
770             f.trap(ConflictError)
771             return None
772         d.addErrback(trap_conflicts)
773         return d