]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
Add download.umask config option with default of 077
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 def is_new_file(pathinfo, db_entry):
46     if db_entry is None:
47         return True
48
49     if not pathinfo.exists and db_entry.size is None:
50         return False
51
52     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
53             (db_entry.size, db_entry.ctime, db_entry.mtime))
54
55
56 class MagicFolder(service.MultiService):
57     name = 'magic-folder'
58
59     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
60                  pending_delay=1.0, clock=None):
61         precondition_abspath(local_path_u)
62
63         service.MultiService.__init__(self)
64
65         immediate = clock is not None
66         clock = clock or reactor
67         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
68         if db is None:
69             return Failure(Exception('ERROR: Unable to load magic folder db.'))
70
71         # for tests
72         self._client = client
73         self._db = db
74
75         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
76         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
77
78         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
79         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
80                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
81
82     def startService(self):
83         # TODO: why is this being called more than once?
84         if self.running:
85             return defer.succeed(None)
86         print "%r.startService" % (self,)
87         service.MultiService.startService(self)
88         return self.uploader.start_monitoring()
89
90     def ready(self):
91         """ready is used to signal us to start
92         processing the upload and download items...
93         """
94         d = self.uploader.start_scanning()
95         d2 = self.downloader.start_scanning()
96         d.addCallback(lambda ign: d2)
97         return d
98
99     def finish(self):
100         print "finish"
101         d = self.uploader.stop()
102         d2 = self.downloader.stop()
103         d.addCallback(lambda ign: d2)
104         return d
105
106     def remove_service(self):
107         return service.MultiService.disownServiceParent(self)
108
109
110 class QueueMixin(HookMixin):
111     def __init__(self, client, local_path_u, db, name, clock):
112         self._client = client
113         self._local_path_u = local_path_u
114         self._local_filepath = to_filepath(local_path_u)
115         self._db = db
116         self._name = name
117         self._clock = clock
118         self._hooks = {'processed': None, 'started': None}
119         self.started_d = self.set_hook('started')
120
121         if not self._local_filepath.exists():
122             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
123                                  "but there is no directory at that location."
124                                  % quote_local_unicode_path(self._local_path_u))
125         if not self._local_filepath.isdir():
126             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
127                                  "but the thing at that location is not a directory."
128                                  % quote_local_unicode_path(self._local_path_u))
129
130         self._deque = deque()
131         self._lazy_tail = defer.succeed(None)
132         self._stopped = False
133         self._turn_delay = 0
134
135     def _get_filepath(self, relpath_u):
136         self._log("_get_filepath(%r)" % (relpath_u,))
137         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
138
139     def _get_relpath(self, filepath):
140         self._log("_get_relpath(%r)" % (filepath,))
141         segments = unicode_segments_from(filepath, self._local_filepath)
142         self._log("segments = %r" % (segments,))
143         return u"/".join(segments)
144
145     def _count(self, counter_name, delta=1):
146         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
147         self._log("%s += %r" % (counter_name, delta))
148         self._client.stats_provider.count(ctr, delta)
149
150     def _logcb(self, res, msg):
151         self._log("%s: %r" % (msg, res))
152         return res
153
154     def _log(self, msg):
155         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
156         self._client.log(s)
157         print s
158         #open("events", "ab+").write(msg)
159
160     def _turn_deque(self):
161         self._log("_turn_deque")
162         if self._stopped:
163             self._log("stopped")
164             return
165         try:
166             item = self._deque.pop()
167             self._log("popped %r" % (item,))
168             self._count('objects_queued', -1)
169         except IndexError:
170             self._log("deque is now empty")
171             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
172         else:
173             self._lazy_tail.addCallback(lambda ign: self._process(item))
174             self._lazy_tail.addBoth(self._call_hook, 'processed')
175             self._lazy_tail.addErrback(log.err)
176             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
177
178
179 class Uploader(QueueMixin):
180     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
181                  immediate=False):
182         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
183
184         self.is_ready = False
185         self._immediate = immediate
186
187         if not IDirectoryNode.providedBy(upload_dirnode):
188             raise AssertionError("The URI in '%s' does not refer to a directory."
189                                  % os.path.join('private', 'magic_folder_dircap'))
190         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
191             raise AssertionError("The URI in '%s' is not a writecap to a directory."
192                                  % os.path.join('private', 'magic_folder_dircap'))
193
194         self._upload_dirnode = upload_dirnode
195         self._inotify = get_inotify_module()
196         self._notifier = self._inotify.INotify()
197         self._pending = set()
198
199         if hasattr(self._notifier, 'set_pending_delay'):
200             self._notifier.set_pending_delay(pending_delay)
201
202         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
203         #
204         self.mask = ( self._inotify.IN_CREATE
205                     | self._inotify.IN_CLOSE_WRITE
206                     | self._inotify.IN_MOVED_TO
207                     | self._inotify.IN_MOVED_FROM
208                     | self._inotify.IN_DELETE
209                     | self._inotify.IN_ONLYDIR
210                     | IN_EXCL_UNLINK
211                     )
212         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
213                              recursive=True)
214
215     def start_monitoring(self):
216         self._log("start_monitoring")
217         d = defer.succeed(None)
218         d.addCallback(lambda ign: self._notifier.startReading())
219         d.addCallback(lambda ign: self._count('dirs_monitored'))
220         d.addBoth(self._call_hook, 'started')
221         return d
222
223     def stop(self):
224         self._log("stop")
225         self._notifier.stopReading()
226         self._count('dirs_monitored', -1)
227         if hasattr(self._notifier, 'wait_until_stopped'):
228             d = self._notifier.wait_until_stopped()
229         else:
230             d = defer.succeed(None)
231         d.addCallback(lambda ign: self._lazy_tail)
232         return d
233
234     def start_scanning(self):
235         self._log("start_scanning")
236         self.is_ready = True
237         self._pending = self._db.get_all_relpaths()
238         self._log("all_files %r" % (self._pending))
239         d = self._scan(u"")
240         def _add_pending(ign):
241             # This adds all of the files that were in the db but not already processed
242             # (normally because they have been deleted on disk).
243             self._log("adding %r" % (self._pending))
244             self._deque.extend(self._pending)
245         d.addCallback(_add_pending)
246         d.addCallback(lambda ign: self._turn_deque())
247         return d
248
249     def _scan(self, reldir_u):
250         self._log("scan %r" % (reldir_u,))
251         fp = self._get_filepath(reldir_u)
252         try:
253             children = listdir_filepath(fp)
254         except EnvironmentError:
255             raise Exception("WARNING: magic folder: permission denied on directory %s"
256                             % quote_filepath(fp))
257         except FilenameEncodingError:
258             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
259                             % quote_filepath(fp))
260
261         d = defer.succeed(None)
262         for child in children:
263             _assert(isinstance(child, unicode), child=child)
264             d.addCallback(lambda ign, child=child:
265                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
266             def _add_pending(relpath_u):
267                 if magicpath.should_ignore_file(relpath_u):
268                     return None
269
270                 self._pending.add(relpath_u)
271                 return relpath_u
272             d.addCallback(_add_pending)
273             # This call to _process doesn't go through the deque, and probably should.
274             d.addCallback(self._process)
275             d.addBoth(self._call_hook, 'processed')
276             d.addErrback(log.err)
277
278         return d
279
280     def is_pending(self, relpath_u):
281         return relpath_u in self._pending
282
283     def _notify(self, opaque, path, events_mask):
284         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
285         relpath_u = self._get_relpath(path)
286
287         # We filter out IN_CREATE events not associated with a directory.
288         # Acting on IN_CREATE for files could cause us to read and upload
289         # a possibly-incomplete file before the application has closed it.
290         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
291         # It isn't possible to avoid watching for IN_CREATE at all, because
292         # it is the only event notified for a directory creation.
293
294         if ((events_mask & self._inotify.IN_CREATE) != 0 and
295             (events_mask & self._inotify.IN_ISDIR) == 0):
296             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
297             return
298         if relpath_u in self._pending:
299             self._log("ignoring event for %r (already pending)" % (relpath_u,))
300             return
301         if magicpath.should_ignore_file(relpath_u):
302             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
303             return
304
305         self._log("appending %r to deque" % (relpath_u,))
306         self._deque.append(relpath_u)
307         self._pending.add(relpath_u)
308         self._count('objects_queued')
309         if self.is_ready:
310             if self._immediate:  # for tests
311                 self._turn_deque()
312             else:
313                 self._clock.callLater(0, self._turn_deque)
314
315     def _when_queue_is_empty(self):
316         return defer.succeed(None)
317
318     def _process(self, relpath_u):
319         # Uploader
320         self._log("_process(%r)" % (relpath_u,))
321         if relpath_u is None:
322             return
323         precondition(isinstance(relpath_u, unicode), relpath_u)
324         precondition(not relpath_u.endswith(u'/'), relpath_u)
325
326         d = defer.succeed(None)
327
328         def _maybe_upload(val, now=None):
329             if now is None:
330                 now = time.time()
331             fp = self._get_filepath(relpath_u)
332             pathinfo = get_pathinfo(unicode_from_filepath(fp))
333
334             self._log("about to remove %r from pending set %r" %
335                       (relpath_u, self._pending))
336             self._pending.remove(relpath_u)
337             encoded_path_u = magicpath.path2magic(relpath_u)
338
339             if not pathinfo.exists:
340                 # FIXME merge this with the 'isfile' case.
341                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
342                 self._count('objects_disappeared')
343
344                 db_entry = self._db.get_db_entry(relpath_u)
345                 if db_entry is None:
346                     return None
347
348                 last_downloaded_timestamp = now  # is this correct?
349
350                 if is_new_file(pathinfo, db_entry):
351                     new_version = db_entry.version + 1
352                 else:
353                     self._log("Not uploading %r" % (relpath_u,))
354                     self._count('objects_not_uploaded')
355                     return
356
357                 metadata = { 'version': new_version,
358                              'deleted': True,
359                              'last_downloaded_timestamp': last_downloaded_timestamp }
360                 if db_entry.last_downloaded_uri is not None:
361                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
362
363                 empty_uploadable = Data("", self._client.convergence)
364                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
365                                                    metadata=metadata, overwrite=True)
366
367                 def _add_db_entry(filenode):
368                     filecap = filenode.get_uri()
369                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
370                     self._db.did_upload_version(relpath_u, new_version, filecap,
371                                                 last_downloaded_uri, last_downloaded_timestamp,
372                                                 pathinfo)
373                     self._count('files_uploaded')
374                 d2.addCallback(_add_db_entry)
375                 return d2
376             elif pathinfo.islink:
377                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
378                 return None
379             elif pathinfo.isdir:
380                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
381                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
382
383                 uploadable = Data("", self._client.convergence)
384                 encoded_path_u += magicpath.path2magic(u"/")
385                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
386                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
387                 def _succeeded(ign):
388                     self._log("created subdirectory %r" % (relpath_u,))
389                     self._count('directories_created')
390                 def _failed(f):
391                     self._log("failed to create subdirectory %r" % (relpath_u,))
392                     return f
393                 upload_d.addCallbacks(_succeeded, _failed)
394                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
395                 return upload_d
396             elif pathinfo.isfile:
397                 db_entry = self._db.get_db_entry(relpath_u)
398
399                 last_downloaded_timestamp = now
400
401                 if db_entry is None:
402                     new_version = 0
403                 elif is_new_file(pathinfo, db_entry):
404                     new_version = db_entry.version + 1
405                 else:
406                     self._log("Not uploading %r" % (relpath_u,))
407                     self._count('objects_not_uploaded')
408                     return None
409
410                 metadata = { 'version': new_version,
411                              'last_downloaded_timestamp': last_downloaded_timestamp }
412                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
413                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
414
415                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
416                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
417                                                    metadata=metadata, overwrite=True)
418
419                 def _add_db_entry(filenode):
420                     filecap = filenode.get_uri()
421                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
422                     self._db.did_upload_version(relpath_u, new_version, filecap,
423                                                 last_downloaded_uri, last_downloaded_timestamp,
424                                                 pathinfo)
425                     self._count('files_uploaded')
426                 d2.addCallback(_add_db_entry)
427                 return d2
428             else:
429                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
430                 return None
431
432         d.addCallback(_maybe_upload)
433
434         def _succeeded(res):
435             self._count('objects_succeeded')
436             return res
437         def _failed(f):
438             self._count('objects_failed')
439             self._log("%s while processing %r" % (f, relpath_u))
440             return f
441         d.addCallbacks(_succeeded, _failed)
442         return d
443
444     def _get_metadata(self, encoded_path_u):
445         try:
446             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
447         except KeyError:
448             return Failure()
449         return d
450
451     def _get_filenode(self, encoded_path_u):
452         try:
453             d = self._upload_dirnode.get(encoded_path_u)
454         except KeyError:
455             return Failure()
456         return d
457
458
459 class WriteFileMixin(object):
460     FUDGE_SECONDS = 10.0
461
462     def _get_conflicted_filename(self, abspath_u):
463         return abspath_u + u".conflict"
464
465     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
466         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
467                   % (abspath_u, len(file_contents), is_conflict, now))
468
469         # 1. Write a temporary file, say .foo.tmp.
470         # 2. is_conflict determines whether this is an overwrite or a conflict.
471         # 3. Set the mtime of the replacement file to be T seconds before the
472         #    current local time.
473         # 4. Perform a file replacement with backup filename foo.backup,
474         #    replaced file foo, and replacement file .foo.tmp. If any step of
475         #    this operation fails, reclassify as a conflict and stop.
476         #
477         # Returns the path of the destination file.
478
479         precondition_abspath(abspath_u)
480         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
481         backup_path_u = abspath_u + u".backup"
482         if now is None:
483             now = time.time()
484
485         # ensure parent directory exists
486         head, tail = os.path.split(abspath_u)
487
488         old_mask = os.umask(self._umask)
489         fileutil.make_dirs(head, ~ self._umask)
490         fileutil.write(replacement_path_u, file_contents)
491         os.umask(old_mask)
492
493         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
494         if is_conflict:
495             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
496             return self._rename_conflicted_file(abspath_u, replacement_path_u)
497         else:
498             try:
499                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
500                 return abspath_u
501             except fileutil.ConflictError:
502                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
503
504     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
505         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
506
507         conflict_path_u = self._get_conflicted_filename(abspath_u)
508         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
509         if os.path.isfile(replacement_path_u):
510             print "%r exists" % (replacement_path_u,)
511         if os.path.isfile(conflict_path_u):
512             print "%r exists" % (conflict_path_u,)
513
514         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
515         return conflict_path_u
516
517     def _rename_deleted_file(self, abspath_u):
518         self._log('renaming deleted file to backup: %s' % (abspath_u,))
519         try:
520             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
521         except OSError:
522             self._log("Already gone: '%s'" % (abspath_u,))
523         return abspath_u
524
525
526 class Downloader(QueueMixin, WriteFileMixin):
527     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
528
529     def __init__(self, client, local_path_u, db, collective_dirnode,
530                  upload_readonly_dircap, clock, is_upload_pending, umask):
531         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
532
533         if not IDirectoryNode.providedBy(collective_dirnode):
534             raise AssertionError("The URI in '%s' does not refer to a directory."
535                                  % os.path.join('private', 'collective_dircap'))
536         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
537             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
538                                  % os.path.join('private', 'collective_dircap'))
539
540         self._collective_dirnode = collective_dirnode
541         self._upload_readonly_dircap = upload_readonly_dircap
542         self._is_upload_pending = is_upload_pending
543         self._umask = umask
544         self._turn_delay = self.REMOTE_SCAN_INTERVAL
545
546     def start_scanning(self):
547         self._log("start_scanning")
548         files = self._db.get_all_relpaths()
549         self._log("all files %s" % files)
550
551         d = self._scan_remote_collective(scan_self=True)
552         d.addBoth(self._logcb, "after _scan_remote_collective 0")
553         self._turn_deque()
554         return d
555
556     def stop(self):
557         self._stopped = True
558         d = defer.succeed(None)
559         d.addCallback(lambda ign: self._lazy_tail)
560         return d
561
562     def _should_download(self, relpath_u, remote_version):
563         """
564         _should_download returns a bool indicating whether or not a remote object should be downloaded.
565         We check the remote metadata version against our magic-folder db version number;
566         latest version wins.
567         """
568         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
569         if magicpath.should_ignore_file(relpath_u):
570             self._log("nope")
571             return False
572         self._log("yep")
573         db_entry = self._db.get_db_entry(relpath_u)
574         if db_entry is None:
575             return True
576         self._log("version %r" % (db_entry.version,))
577         return (db_entry.version < remote_version)
578
579     def _get_local_latest(self, relpath_u):
580         """
581         _get_local_latest takes a unicode path string checks to see if this file object
582         exists in our magic-folder db; if not then return None
583         else check for an entry in our magic-folder db and return the version number.
584         """
585         if not self._get_filepath(relpath_u).exists():
586             return None
587         db_entry = self._db.get_db_entry(relpath_u)
588         return None if db_entry is None else db_entry.version
589
590     def _get_collective_latest_file(self, filename):
591         """
592         _get_collective_latest_file takes a file path pointing to a file managed by
593         magic-folder and returns a deferred that fires with the two tuple containing a
594         file node and metadata for the latest version of the file located in the
595         magic-folder collective directory.
596         """
597         collective_dirmap_d = self._collective_dirnode.list()
598         def scan_collective(result):
599             list_of_deferreds = []
600             for dir_name in result.keys():
601                 # XXX make sure it's a directory
602                 d = defer.succeed(None)
603                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
604                 list_of_deferreds.append(d)
605             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
606             return deferList
607         collective_dirmap_d.addCallback(scan_collective)
608         def highest_version(deferredList):
609             max_version = 0
610             metadata = None
611             node = None
612             for success, result in deferredList:
613                 if success:
614                     if result[1]['version'] > max_version:
615                         node, metadata = result
616                         max_version = result[1]['version']
617             return node, metadata
618         collective_dirmap_d.addCallback(highest_version)
619         return collective_dirmap_d
620
621     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
622         self._log("_scan_remote_dmd nickname %r" % (nickname,))
623         d = dirnode.list()
624         def scan_listing(listing_map):
625             for encoded_relpath_u in listing_map.keys():
626                 relpath_u = magicpath.magic2path(encoded_relpath_u)
627                 self._log("found %r" % (relpath_u,))
628
629                 file_node, metadata = listing_map[encoded_relpath_u]
630                 local_version = self._get_local_latest(relpath_u)
631                 remote_version = metadata.get('version', None)
632                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
633
634                 if local_version is None or remote_version is None or local_version < remote_version:
635                     self._log("%r added to download queue" % (relpath_u,))
636                     if scan_batch.has_key(relpath_u):
637                         scan_batch[relpath_u] += [(file_node, metadata)]
638                     else:
639                         scan_batch[relpath_u] = [(file_node, metadata)]
640
641         d.addCallback(scan_listing)
642         d.addBoth(self._logcb, "end of _scan_remote_dmd")
643         return d
644
645     def _scan_remote_collective(self, scan_self=False):
646         self._log("_scan_remote_collective")
647         scan_batch = {}  # path -> [(filenode, metadata)]
648
649         d = self._collective_dirnode.list()
650         def scan_collective(dirmap):
651             d2 = defer.succeed(None)
652             for dir_name in dirmap:
653                 (dirnode, metadata) = dirmap[dir_name]
654                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
655                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
656                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
657                     def _err(f, dir_name=dir_name):
658                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
659                         # XXX what should we do to make this failure more visible to users?
660                     d2.addErrback(_err)
661
662             return d2
663         d.addCallback(scan_collective)
664
665         def _filter_batch_to_deque(ign):
666             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
667             for relpath_u in scan_batch.keys():
668                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
669
670                 if self._should_download(relpath_u, metadata['version']):
671                     self._deque.append( (relpath_u, file_node, metadata) )
672                 else:
673                     self._log("Excluding %r" % (relpath_u,))
674                     self._call_hook(None, 'processed')
675
676             self._log("deque after = %r" % (self._deque,))
677         d.addCallback(_filter_batch_to_deque)
678         return d
679
680     def _when_queue_is_empty(self):
681         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
682         d.addBoth(self._logcb, "after _scan_remote_collective 1")
683         d.addCallback(lambda ign: self._turn_deque())
684         return d
685
686     def _process(self, item, now=None):
687         # Downloader
688         self._log("_process(%r)" % (item,))
689         if now is None:
690             now = time.time()
691         (relpath_u, file_node, metadata) = item
692         fp = self._get_filepath(relpath_u)
693         abspath_u = unicode_from_filepath(fp)
694         conflict_path_u = self._get_conflicted_filename(abspath_u)
695
696         d = defer.succeed(None)
697
698         def do_update_db(written_abspath_u):
699             filecap = file_node.get_uri()
700             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
701             last_downloaded_uri = filecap
702             last_downloaded_timestamp = now
703             written_pathinfo = get_pathinfo(written_abspath_u)
704
705             if not written_pathinfo.exists and not metadata.get('deleted', False):
706                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
707
708             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
709                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
710             self._count('objects_downloaded')
711         def failed(f):
712             self._log("download failed: %s" % (str(f),))
713             self._count('objects_failed')
714             return f
715
716         if os.path.isfile(conflict_path_u):
717             def fail(res):
718                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
719             d.addCallback(fail)
720         else:
721             is_conflict = False
722             db_entry = self._db.get_db_entry(relpath_u)
723             dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
724             dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
725             if db_entry:
726                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
727                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
728                         is_conflict = True
729                         self._count('objects_conflicted')
730                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
731                     is_conflict = True
732                     self._count('objects_conflicted')
733                 elif self._is_upload_pending(relpath_u):
734                     is_conflict = True
735                     self._count('objects_conflicted')
736
737             if relpath_u.endswith(u"/"):
738                 if metadata.get('deleted', False):
739                     self._log("rmdir(%r) ignored" % (abspath_u,))
740                 else:
741                     self._log("mkdir(%r)" % (abspath_u,))
742                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
743                     d.addCallback(lambda ign: abspath_u)
744             else:
745                 if metadata.get('deleted', False):
746                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
747                 else:
748                     d.addCallback(lambda ign: file_node.download_best_version())
749                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
750                                                                                is_conflict=is_conflict))
751
752         d.addCallbacks(do_update_db, failed)
753
754         def trap_conflicts(f):
755             f.trap(ConflictError)
756             return None
757         d.addErrback(trap_conflicts)
758         return d