]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
15ebf518aaff298210a752da4712c2a8d8c57cd3
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 def is_new_file(pathinfo, db_entry):
46     if db_entry is None:
47         return True
48
49     if not pathinfo.exists and db_entry.size is None:
50         return False
51
52     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
53             (db_entry.size, db_entry.ctime, db_entry.mtime))
54
55
56 class MagicFolder(service.MultiService):
57     name = 'magic-folder'
58
59     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
60                  pending_delay=1.0, clock=None):
61         precondition_abspath(local_path_u)
62
63         service.MultiService.__init__(self)
64
65         immediate = clock is not None
66         clock = clock or reactor
67         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
68         if db is None:
69             return Failure(Exception('ERROR: Unable to load magic folder db.'))
70
71         # for tests
72         self._client = client
73         self._db = db
74
75         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
76         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
77
78         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
79         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
80                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
81
82     def startService(self):
83         # TODO: why is this being called more than once?
84         if self.running:
85             return defer.succeed(None)
86         print "%r.startService" % (self,)
87         service.MultiService.startService(self)
88         return self.uploader.start_monitoring()
89
90     def ready(self):
91         """ready is used to signal us to start
92         processing the upload and download items...
93         """
94         d = self.uploader.start_scanning()
95         d2 = self.downloader.start_scanning()
96         d.addCallback(lambda ign: d2)
97         return d
98
99     def finish(self):
100         print "finish"
101         d = self.uploader.stop()
102         d2 = self.downloader.stop()
103         d.addCallback(lambda ign: d2)
104         return d
105
106     def remove_service(self):
107         return service.MultiService.disownServiceParent(self)
108
109
110 class QueueMixin(HookMixin):
111     def __init__(self, client, local_path_u, db, name, clock):
112         self._client = client
113         self._local_path_u = local_path_u
114         self._local_filepath = to_filepath(local_path_u)
115         self._db = db
116         self._name = name
117         self._clock = clock
118         self._hooks = {'processed': None, 'started': None}
119         self.started_d = self.set_hook('started')
120
121         if not self._local_filepath.exists():
122             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
123                                  "but there is no directory at that location."
124                                  % quote_local_unicode_path(self._local_path_u))
125         if not self._local_filepath.isdir():
126             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
127                                  "but the thing at that location is not a directory."
128                                  % quote_local_unicode_path(self._local_path_u))
129
130         self._deque = deque()
131         self._lazy_tail = defer.succeed(None)
132         self._stopped = False
133         self._turn_delay = 0
134
135     def _get_filepath(self, relpath_u):
136         self._log("_get_filepath(%r)" % (relpath_u,))
137         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
138
139     def _get_relpath(self, filepath):
140         self._log("_get_relpath(%r)" % (filepath,))
141         segments = unicode_segments_from(filepath, self._local_filepath)
142         self._log("segments = %r" % (segments,))
143         return u"/".join(segments)
144
145     def _count(self, counter_name, delta=1):
146         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
147         self._log("%s += %r" % (counter_name, delta))
148         self._client.stats_provider.count(ctr, delta)
149
150     def _logcb(self, res, msg):
151         self._log("%s: %r" % (msg, res))
152         return res
153
154     def _log(self, msg):
155         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
156         self._client.log(s)
157         print s
158         #open("events", "ab+").write(msg)
159
160     def _turn_deque(self):
161         self._log("_turn_deque")
162         if self._stopped:
163             self._log("stopped")
164             return
165         try:
166             item = self._deque.pop()
167             self._log("popped %r" % (item,))
168             self._count('objects_queued', -1)
169         except IndexError:
170             self._log("deque is now empty")
171             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
172         else:
173             self._lazy_tail.addCallback(lambda ign: self._process(item))
174             self._lazy_tail.addBoth(self._call_hook, 'processed')
175             self._lazy_tail.addErrback(log.err)
176             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
177
178
179 class Uploader(QueueMixin):
180     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
181                  immediate=False):
182         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
183
184         self.is_ready = False
185         self._immediate = immediate
186
187         if not IDirectoryNode.providedBy(upload_dirnode):
188             raise AssertionError("The URI in '%s' does not refer to a directory."
189                                  % os.path.join('private', 'magic_folder_dircap'))
190         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
191             raise AssertionError("The URI in '%s' is not a writecap to a directory."
192                                  % os.path.join('private', 'magic_folder_dircap'))
193
194         self._upload_dirnode = upload_dirnode
195         self._inotify = get_inotify_module()
196         self._notifier = self._inotify.INotify()
197         self._pending = set()
198
199         if hasattr(self._notifier, 'set_pending_delay'):
200             self._notifier.set_pending_delay(pending_delay)
201
202         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
203         #
204         self.mask = ( self._inotify.IN_CREATE
205                     | self._inotify.IN_CLOSE_WRITE
206                     | self._inotify.IN_MOVED_TO
207                     | self._inotify.IN_MOVED_FROM
208                     | self._inotify.IN_DELETE
209                     | self._inotify.IN_ONLYDIR
210                     | IN_EXCL_UNLINK
211                     )
212         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
213                              recursive=True)
214
215     def start_monitoring(self):
216         self._log("start_monitoring")
217         d = defer.succeed(None)
218         d.addCallback(lambda ign: self._notifier.startReading())
219         d.addCallback(lambda ign: self._count('dirs_monitored'))
220         d.addBoth(self._call_hook, 'started')
221         return d
222
223     def stop(self):
224         self._log("stop")
225         self._notifier.stopReading()
226         self._count('dirs_monitored', -1)
227         if hasattr(self._notifier, 'wait_until_stopped'):
228             d = self._notifier.wait_until_stopped()
229         else:
230             d = defer.succeed(None)
231         d.addCallback(lambda ign: self._lazy_tail)
232         return d
233
234     def start_scanning(self):
235         self._log("start_scanning")
236         self.is_ready = True
237         self._pending = self._db.get_all_relpaths()
238         self._log("all_files %r" % (self._pending))
239         d = self._scan(u"")
240         def _add_pending(ign):
241             # This adds all of the files that were in the db but not already processed
242             # (normally because they have been deleted on disk).
243             self._log("adding %r" % (self._pending))
244             self._deque.extend(self._pending)
245         d.addCallback(_add_pending)
246         d.addCallback(lambda ign: self._turn_deque())
247         return d
248
249     def _scan(self, reldir_u):
250         self._log("scan %r" % (reldir_u,))
251         fp = self._get_filepath(reldir_u)
252         try:
253             children = listdir_filepath(fp)
254         except EnvironmentError:
255             raise Exception("WARNING: magic folder: permission denied on directory %s"
256                             % quote_filepath(fp))
257         except FilenameEncodingError:
258             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
259                             % quote_filepath(fp))
260
261         d = defer.succeed(None)
262         for child in children:
263             _assert(isinstance(child, unicode), child=child)
264             d.addCallback(lambda ign, child=child:
265                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
266             def _add_pending(relpath_u):
267                 if magicpath.should_ignore_file(relpath_u):
268                     return None
269
270                 self._pending.add(relpath_u)
271                 return relpath_u
272             d.addCallback(_add_pending)
273             # This call to _process doesn't go through the deque, and probably should.
274             d.addCallback(self._process)
275             d.addBoth(self._call_hook, 'processed')
276             d.addErrback(log.err)
277
278         return d
279
280     def is_pending(self, relpath_u):
281         return relpath_u in self._pending
282
283     def _notify(self, opaque, path, events_mask):
284         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
285         relpath_u = self._get_relpath(path)
286
287         # We filter out IN_CREATE events not associated with a directory.
288         # Acting on IN_CREATE for files could cause us to read and upload
289         # a possibly-incomplete file before the application has closed it.
290         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
291         # It isn't possible to avoid watching for IN_CREATE at all, because
292         # it is the only event notified for a directory creation.
293
294         if ((events_mask & self._inotify.IN_CREATE) != 0 and
295             (events_mask & self._inotify.IN_ISDIR) == 0):
296             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
297             return
298         if relpath_u in self._pending:
299             self._log("ignoring event for %r (already pending)" % (relpath_u,))
300             return
301         if magicpath.should_ignore_file(relpath_u):
302             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
303             return
304
305         self._log("appending %r to deque" % (relpath_u,))
306         self._deque.append(relpath_u)
307         self._pending.add(relpath_u)
308         self._count('objects_queued')
309         if self.is_ready:
310             if self._immediate:  # for tests
311                 self._turn_deque()
312             else:
313                 self._clock.callLater(0, self._turn_deque)
314
315     def _when_queue_is_empty(self):
316         return defer.succeed(None)
317
318     def _process(self, relpath_u):
319         # Uploader
320         self._log("_process(%r)" % (relpath_u,))
321         if relpath_u is None:
322             return
323         precondition(isinstance(relpath_u, unicode), relpath_u)
324         precondition(not relpath_u.endswith(u'/'), relpath_u)
325
326         d = defer.succeed(None)
327
328         def _maybe_upload(val, now=None):
329             if now is None:
330                 now = time.time()
331             fp = self._get_filepath(relpath_u)
332             pathinfo = get_pathinfo(unicode_from_filepath(fp))
333
334             self._log("about to remove %r from pending set %r" %
335                       (relpath_u, self._pending))
336             self._pending.remove(relpath_u)
337             encoded_path_u = magicpath.path2magic(relpath_u)
338
339             if not pathinfo.exists:
340                 # FIXME merge this with the 'isfile' case.
341                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
342                 self._count('objects_disappeared')
343
344                 db_entry = self._db.get_db_entry(relpath_u)
345                 if db_entry is None:
346                     return None
347
348                 last_downloaded_timestamp = now  # is this correct?
349
350                 if is_new_file(pathinfo, db_entry):
351                     new_version = db_entry.version + 1
352                 else:
353                     self._log("Not uploading %r" % (relpath_u,))
354                     self._count('objects_not_uploaded')
355                     return
356
357                 metadata = { 'version': new_version,
358                              'deleted': True,
359                              'last_downloaded_timestamp': last_downloaded_timestamp }
360                 if db_entry.last_downloaded_uri is not None:
361                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
362
363                 empty_uploadable = Data("", self._client.convergence)
364                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
365                                                    metadata=metadata, overwrite=True)
366
367                 def _add_db_entry(filenode):
368                     filecap = filenode.get_uri()
369                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
370                     self._db.did_upload_version(relpath_u, new_version, filecap,
371                                                 last_downloaded_uri, last_downloaded_timestamp,
372                                                 pathinfo)
373                     self._count('files_uploaded')
374                 d2.addCallback(_add_db_entry)
375                 return d2
376             elif pathinfo.islink:
377                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
378                 return None
379             elif pathinfo.isdir:
380                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
381                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
382
383                 uploadable = Data("", self._client.convergence)
384                 encoded_path_u += magicpath.path2magic(u"/")
385                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
386                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
387                 def _succeeded(ign):
388                     self._log("created subdirectory %r" % (relpath_u,))
389                     self._count('directories_created')
390                 def _failed(f):
391                     self._log("failed to create subdirectory %r" % (relpath_u,))
392                     return f
393                 upload_d.addCallbacks(_succeeded, _failed)
394                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
395                 return upload_d
396             elif pathinfo.isfile:
397                 db_entry = self._db.get_db_entry(relpath_u)
398
399                 last_downloaded_timestamp = now
400
401                 if db_entry is None:
402                     new_version = 0
403                 elif is_new_file(pathinfo, db_entry):
404                     new_version = db_entry.version + 1
405                 else:
406                     self._log("Not uploading %r" % (relpath_u,))
407                     self._count('objects_not_uploaded')
408                     return None
409
410                 metadata = { 'version': new_version,
411                              'last_downloaded_timestamp': last_downloaded_timestamp }
412                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
413                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
414
415                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
416                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
417                                                    metadata=metadata, overwrite=True)
418
419                 def _add_db_entry(filenode):
420                     filecap = filenode.get_uri()
421                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
422                     self._db.did_upload_version(relpath_u, new_version, filecap,
423                                                 last_downloaded_uri, last_downloaded_timestamp,
424                                                 pathinfo)
425                     self._count('files_uploaded')
426                 d2.addCallback(_add_db_entry)
427                 return d2
428             else:
429                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
430                 return None
431
432         d.addCallback(_maybe_upload)
433
434         def _succeeded(res):
435             self._count('objects_succeeded')
436             return res
437         def _failed(f):
438             self._count('objects_failed')
439             self._log("%s while processing %r" % (f, relpath_u))
440             return f
441         d.addCallbacks(_succeeded, _failed)
442         return d
443
444     def _get_metadata(self, encoded_path_u):
445         try:
446             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
447         except KeyError:
448             return Failure()
449         return d
450
451     def _get_filenode(self, encoded_path_u):
452         try:
453             d = self._upload_dirnode.get(encoded_path_u)
454         except KeyError:
455             return Failure()
456         return d
457
458
459 class WriteFileMixin(object):
460     FUDGE_SECONDS = 10.0
461
462     def _get_conflicted_filename(self, abspath_u):
463         return abspath_u + u".conflict"
464
465     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
466         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
467                   % (abspath_u, len(file_contents), is_conflict, now))
468
469         # 1. Write a temporary file, say .foo.tmp.
470         # 2. is_conflict determines whether this is an overwrite or a conflict.
471         # 3. Set the mtime of the replacement file to be T seconds before the
472         #    current local time.
473         # 4. Perform a file replacement with backup filename foo.backup,
474         #    replaced file foo, and replacement file .foo.tmp. If any step of
475         #    this operation fails, reclassify as a conflict and stop.
476         #
477         # Returns the path of the destination file.
478
479         precondition_abspath(abspath_u)
480         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
481         backup_path_u = abspath_u + u".backup"
482         if now is None:
483             now = time.time()
484
485         # ensure parent directory exists
486         head, tail = os.path.split(abspath_u)
487
488         try:
489             old_mask = os.umask(self._umask)
490             fileutil.make_dirs(head, (~ self._umask) & 0777)
491             fileutil.write(replacement_path_u, file_contents)
492         finally:
493             os.umask(old_mask)
494
495         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
496         if is_conflict:
497             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
498             return self._rename_conflicted_file(abspath_u, replacement_path_u)
499         else:
500             try:
501                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
502                 return abspath_u
503             except fileutil.ConflictError:
504                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
505
506     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
507         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
508
509         conflict_path_u = self._get_conflicted_filename(abspath_u)
510         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
511         if os.path.isfile(replacement_path_u):
512             print "%r exists" % (replacement_path_u,)
513         if os.path.isfile(conflict_path_u):
514             print "%r exists" % (conflict_path_u,)
515
516         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
517         return conflict_path_u
518
519     def _rename_deleted_file(self, abspath_u):
520         self._log('renaming deleted file to backup: %s' % (abspath_u,))
521         try:
522             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
523         except OSError:
524             self._log("Already gone: '%s'" % (abspath_u,))
525         return abspath_u
526
527
528 class Downloader(QueueMixin, WriteFileMixin):
529     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
530
531     def __init__(self, client, local_path_u, db, collective_dirnode,
532                  upload_readonly_dircap, clock, is_upload_pending, umask):
533         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
534
535         if not IDirectoryNode.providedBy(collective_dirnode):
536             raise AssertionError("The URI in '%s' does not refer to a directory."
537                                  % os.path.join('private', 'collective_dircap'))
538         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
539             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
540                                  % os.path.join('private', 'collective_dircap'))
541
542         self._collective_dirnode = collective_dirnode
543         self._upload_readonly_dircap = upload_readonly_dircap
544         self._is_upload_pending = is_upload_pending
545         self._umask = umask
546         self._turn_delay = self.REMOTE_SCAN_INTERVAL
547
548     def start_scanning(self):
549         self._log("start_scanning")
550         files = self._db.get_all_relpaths()
551         self._log("all files %s" % files)
552
553         d = self._scan_remote_collective(scan_self=True)
554         d.addBoth(self._logcb, "after _scan_remote_collective 0")
555         self._turn_deque()
556         return d
557
558     def stop(self):
559         self._stopped = True
560         d = defer.succeed(None)
561         d.addCallback(lambda ign: self._lazy_tail)
562         return d
563
564     def _should_download(self, relpath_u, remote_version):
565         """
566         _should_download returns a bool indicating whether or not a remote object should be downloaded.
567         We check the remote metadata version against our magic-folder db version number;
568         latest version wins.
569         """
570         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
571         if magicpath.should_ignore_file(relpath_u):
572             self._log("nope")
573             return False
574         self._log("yep")
575         db_entry = self._db.get_db_entry(relpath_u)
576         if db_entry is None:
577             return True
578         self._log("version %r" % (db_entry.version,))
579         return (db_entry.version < remote_version)
580
581     def _get_local_latest(self, relpath_u):
582         """
583         _get_local_latest takes a unicode path string checks to see if this file object
584         exists in our magic-folder db; if not then return None
585         else check for an entry in our magic-folder db and return the version number.
586         """
587         if not self._get_filepath(relpath_u).exists():
588             return None
589         db_entry = self._db.get_db_entry(relpath_u)
590         return None if db_entry is None else db_entry.version
591
592     def _get_collective_latest_file(self, filename):
593         """
594         _get_collective_latest_file takes a file path pointing to a file managed by
595         magic-folder and returns a deferred that fires with the two tuple containing a
596         file node and metadata for the latest version of the file located in the
597         magic-folder collective directory.
598         """
599         collective_dirmap_d = self._collective_dirnode.list()
600         def scan_collective(result):
601             list_of_deferreds = []
602             for dir_name in result.keys():
603                 # XXX make sure it's a directory
604                 d = defer.succeed(None)
605                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
606                 list_of_deferreds.append(d)
607             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
608             return deferList
609         collective_dirmap_d.addCallback(scan_collective)
610         def highest_version(deferredList):
611             max_version = 0
612             metadata = None
613             node = None
614             for success, result in deferredList:
615                 if success:
616                     if result[1]['version'] > max_version:
617                         node, metadata = result
618                         max_version = result[1]['version']
619             return node, metadata
620         collective_dirmap_d.addCallback(highest_version)
621         return collective_dirmap_d
622
623     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
624         self._log("_scan_remote_dmd nickname %r" % (nickname,))
625         d = dirnode.list()
626         def scan_listing(listing_map):
627             for encoded_relpath_u in listing_map.keys():
628                 relpath_u = magicpath.magic2path(encoded_relpath_u)
629                 self._log("found %r" % (relpath_u,))
630
631                 file_node, metadata = listing_map[encoded_relpath_u]
632                 local_version = self._get_local_latest(relpath_u)
633                 remote_version = metadata.get('version', None)
634                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
635
636                 if local_version is None or remote_version is None or local_version < remote_version:
637                     self._log("%r added to download queue" % (relpath_u,))
638                     if scan_batch.has_key(relpath_u):
639                         scan_batch[relpath_u] += [(file_node, metadata)]
640                     else:
641                         scan_batch[relpath_u] = [(file_node, metadata)]
642
643         d.addCallback(scan_listing)
644         d.addBoth(self._logcb, "end of _scan_remote_dmd")
645         return d
646
647     def _scan_remote_collective(self, scan_self=False):
648         self._log("_scan_remote_collective")
649         scan_batch = {}  # path -> [(filenode, metadata)]
650
651         d = self._collective_dirnode.list()
652         def scan_collective(dirmap):
653             d2 = defer.succeed(None)
654             for dir_name in dirmap:
655                 (dirnode, metadata) = dirmap[dir_name]
656                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
657                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
658                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
659                     def _err(f, dir_name=dir_name):
660                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
661                         # XXX what should we do to make this failure more visible to users?
662                     d2.addErrback(_err)
663
664             return d2
665         d.addCallback(scan_collective)
666
667         def _filter_batch_to_deque(ign):
668             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
669             for relpath_u in scan_batch.keys():
670                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
671
672                 if self._should_download(relpath_u, metadata['version']):
673                     self._deque.append( (relpath_u, file_node, metadata) )
674                 else:
675                     self._log("Excluding %r" % (relpath_u,))
676                     self._call_hook(None, 'processed')
677
678             self._log("deque after = %r" % (self._deque,))
679         d.addCallback(_filter_batch_to_deque)
680         return d
681
682     def _when_queue_is_empty(self):
683         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
684         d.addBoth(self._logcb, "after _scan_remote_collective 1")
685         d.addCallback(lambda ign: self._turn_deque())
686         return d
687
688     def _process(self, item, now=None):
689         # Downloader
690         self._log("_process(%r)" % (item,))
691         if now is None:
692             now = time.time()
693         (relpath_u, file_node, metadata) = item
694         fp = self._get_filepath(relpath_u)
695         abspath_u = unicode_from_filepath(fp)
696         conflict_path_u = self._get_conflicted_filename(abspath_u)
697
698         d = defer.succeed(None)
699
700         def do_update_db(written_abspath_u):
701             filecap = file_node.get_uri()
702             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
703             last_downloaded_uri = filecap
704             last_downloaded_timestamp = now
705             written_pathinfo = get_pathinfo(written_abspath_u)
706
707             if not written_pathinfo.exists and not metadata.get('deleted', False):
708                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
709
710             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
711                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
712             self._count('objects_downloaded')
713         def failed(f):
714             self._log("download failed: %s" % (str(f),))
715             self._count('objects_failed')
716             return f
717
718         if os.path.isfile(conflict_path_u):
719             def fail(res):
720                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
721             d.addCallback(fail)
722         else:
723             is_conflict = False
724             db_entry = self._db.get_db_entry(relpath_u)
725             dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
726             dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
727             if db_entry:
728                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
729                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
730                         is_conflict = True
731                         self._count('objects_conflicted')
732                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
733                     is_conflict = True
734                     self._count('objects_conflicted')
735                 elif self._is_upload_pending(relpath_u):
736                     is_conflict = True
737                     self._count('objects_conflicted')
738
739             if relpath_u.endswith(u"/"):
740                 if metadata.get('deleted', False):
741                     self._log("rmdir(%r) ignored" % (abspath_u,))
742                 else:
743                     self._log("mkdir(%r)" % (abspath_u,))
744                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
745                     d.addCallback(lambda ign: abspath_u)
746             else:
747                 if metadata.get('deleted', False):
748                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
749                 else:
750                     d.addCallback(lambda ign: file_node.download_best_version())
751                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
752                                                                                is_conflict=is_conflict))
753
754         d.addCallbacks(do_update_db, failed)
755
756         def trap_conflicts(f):
757             f.trap(ConflictError)
758             return None
759         d.addErrback(trap_conflicts)
760         return d