]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
abb0a1ff5e4d77252af98b29d0b8dde77a00933f
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 def is_new_file(pathinfo, db_entry):
46     if db_entry is None:
47         return True
48
49     if not pathinfo.exists and db_entry.size is None:
50         return False
51
52     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
53             (db_entry.size, db_entry.ctime, db_entry.mtime))
54
55
56 class MagicFolder(service.MultiService):
57     name = 'magic-folder'
58
59     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
60                  pending_delay=1.0, clock=None):
61         precondition_abspath(local_path_u)
62
63         service.MultiService.__init__(self)
64
65         immediate = clock is not None
66         clock = clock or reactor
67         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
68         if db is None:
69             return Failure(Exception('ERROR: Unable to load magic folder db.'))
70
71         # for tests
72         self._client = client
73         self._db = db
74
75         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
76         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
77
78         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
79         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
80                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
81
82     def startService(self):
83         # TODO: why is this being called more than once?
84         if self.running:
85             return defer.succeed(None)
86         print "%r.startService" % (self,)
87         service.MultiService.startService(self)
88         return self.uploader.start_monitoring()
89
90     def ready(self):
91         """ready is used to signal us to start
92         processing the upload and download items...
93         """
94         self.uploader.start_uploading()  # synchronous
95         return self.downloader.start_downloading()
96
97     def finish(self):
98         print "finish"
99         d = self.uploader.stop()
100         d2 = self.downloader.stop()
101         d.addCallback(lambda ign: d2)
102         return d
103
104     def remove_service(self):
105         return service.MultiService.disownServiceParent(self)
106
107
108 class QueueMixin(HookMixin):
109     def __init__(self, client, local_path_u, db, name, clock):
110         self._client = client
111         self._local_path_u = local_path_u
112         self._local_filepath = to_filepath(local_path_u)
113         self._db = db
114         self._name = name
115         self._clock = clock
116         self._hooks = {'processed': None, 'started': None}
117         self.started_d = self.set_hook('started')
118
119         if not self._local_filepath.exists():
120             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
121                                  "but there is no directory at that location."
122                                  % quote_local_unicode_path(self._local_path_u))
123         if not self._local_filepath.isdir():
124             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
125                                  "but the thing at that location is not a directory."
126                                  % quote_local_unicode_path(self._local_path_u))
127
128         self._deque = deque()
129         self._lazy_tail = defer.succeed(None)
130         self._stopped = False
131         self._turn_delay = 0
132
133     def _get_filepath(self, relpath_u):
134         self._log("_get_filepath(%r)" % (relpath_u,))
135         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
136
137     def _get_relpath(self, filepath):
138         self._log("_get_relpath(%r)" % (filepath,))
139         segments = unicode_segments_from(filepath, self._local_filepath)
140         self._log("segments = %r" % (segments,))
141         return u"/".join(segments)
142
143     def _count(self, counter_name, delta=1):
144         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
145         self._log("%s += %r" % (counter_name, delta))
146         self._client.stats_provider.count(ctr, delta)
147
148     def _logcb(self, res, msg):
149         self._log("%s: %r" % (msg, res))
150         return res
151
152     def _log(self, msg):
153         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
154         self._client.log(s)
155         print s
156         #open("events", "ab+").write(msg)
157
158     def _turn_deque(self):
159         self._log("_turn_deque")
160         if self._stopped:
161             self._log("stopped")
162             return
163         try:
164             item = self._deque.pop()
165             self._log("popped %r" % (item,))
166             self._count('objects_queued', -1)
167         except IndexError:
168             self._log("deque is now empty")
169             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
170         else:
171             self._lazy_tail.addCallback(lambda ign: self._process(item))
172             self._lazy_tail.addBoth(self._call_hook, 'processed')
173             self._lazy_tail.addErrback(log.err)
174             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
175
176
177 class Uploader(QueueMixin):
178     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
179                  immediate=False):
180         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
181
182         self.is_ready = False
183         self._immediate = immediate
184
185         if not IDirectoryNode.providedBy(upload_dirnode):
186             raise AssertionError("The URI in '%s' does not refer to a directory."
187                                  % os.path.join('private', 'magic_folder_dircap'))
188         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
189             raise AssertionError("The URI in '%s' is not a writecap to a directory."
190                                  % os.path.join('private', 'magic_folder_dircap'))
191
192         self._upload_dirnode = upload_dirnode
193         self._inotify = get_inotify_module()
194         self._notifier = self._inotify.INotify()
195         self._pending = set()  # of unicode relpaths
196
197         self._periodic_full_scan_duration = 10 * 60 # perform a full scan every 10 minutes
198
199         if hasattr(self._notifier, 'set_pending_delay'):
200             self._notifier.set_pending_delay(pending_delay)
201
202         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
203         #
204         self.mask = ( self._inotify.IN_CREATE
205                     | self._inotify.IN_CLOSE_WRITE
206                     | self._inotify.IN_MOVED_TO
207                     | self._inotify.IN_MOVED_FROM
208                     | self._inotify.IN_DELETE
209                     | self._inotify.IN_ONLYDIR
210                     | IN_EXCL_UNLINK
211                     )
212         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
213                              recursive=True)
214
215     def start_monitoring(self):
216         self._log("start_monitoring")
217         d = defer.succeed(None)
218         d.addCallback(lambda ign: self._notifier.startReading())
219         d.addCallback(lambda ign: self._count('dirs_monitored'))
220         d.addBoth(self._call_hook, 'started')
221         return d
222
223     def stop(self):
224         self._log("stop")
225         self._notifier.stopReading()
226         self._count('dirs_monitored', -1)
227         self.periodic_callid.cancel()
228         if hasattr(self._notifier, 'wait_until_stopped'):
229             d = self._notifier.wait_until_stopped()
230         else:
231             d = defer.succeed(None)
232         d.addCallback(lambda ign: self._lazy_tail)
233         return d
234
235     def start_uploading(self):
236         self._log("start_uploading")
237         self.is_ready = True
238
239         all_relpaths = self._db.get_all_relpaths()
240         self._log("all relpaths: %r" % (all_relpaths,))
241
242         for relpath_u in all_relpaths:
243             self._add_pending(relpath_u)
244
245         self._periodic_full_scan()
246         self._extend_queue_and_keep_going(self._pending)
247
248     def _extend_queue_and_keep_going(self, relpaths_u):
249         self._log("queueing %r" % (relpaths_u,))
250         self._deque.extend(relpaths_u)
251         self._count('objects_queued', len(relpaths_u))
252
253         if self.is_ready:
254             if self._immediate:  # for tests
255                 self._turn_deque()
256             else:
257                 self._clock.callLater(0, self._turn_deque)
258
259     def _periodic_full_scan(self):
260         self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._periodic_full_scan)
261         if len(self._pending) == 0:
262             self._full_scan()
263
264     def _full_scan(self):
265         print "FULL SCAN"
266         self._log("_pending %r" % (self._pending))
267         self._scan(u"")
268
269     def _add_pending(self, relpath_u):
270         if not magicpath.should_ignore_file(relpath_u):
271             self._pending.add(relpath_u)
272
273     def _scan(self, reldir_u):
274         # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
275         # Note that this doesn't add them to the deque -- that will
276
277         self._log("scan %r" % (reldir_u,))
278         fp = self._get_filepath(reldir_u)
279         try:
280             children = listdir_filepath(fp)
281         except EnvironmentError:
282             raise Exception("WARNING: magic folder: permission denied on directory %s"
283                             % quote_filepath(fp))
284         except FilenameEncodingError:
285             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
286                             % quote_filepath(fp))
287
288         for child in children:
289             _assert(isinstance(child, unicode), child=child)
290             self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
291
292     def is_pending(self, relpath_u):
293         return relpath_u in self._pending
294
295     def _notify(self, opaque, path, events_mask):
296         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
297         relpath_u = self._get_relpath(path)
298
299         # We filter out IN_CREATE events not associated with a directory.
300         # Acting on IN_CREATE for files could cause us to read and upload
301         # a possibly-incomplete file before the application has closed it.
302         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
303         # It isn't possible to avoid watching for IN_CREATE at all, because
304         # it is the only event notified for a directory creation.
305
306         if ((events_mask & self._inotify.IN_CREATE) != 0 and
307             (events_mask & self._inotify.IN_ISDIR) == 0):
308             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
309             return
310         if relpath_u in self._pending:
311             self._log("not queueing %r because it is already pending" % (relpath_u,))
312             return
313         if magicpath.should_ignore_file(relpath_u):
314             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
315             return
316
317         self._pending.add(relpath_u)
318         self._extend_queue_and_keep_going([relpath_u])
319
320     def _when_queue_is_empty(self):
321         return defer.succeed(None)
322
323     def _process(self, relpath_u):
324         # Uploader
325         self._log("_process(%r)" % (relpath_u,))
326         if relpath_u is None:
327             return
328         precondition(isinstance(relpath_u, unicode), relpath_u)
329         precondition(not relpath_u.endswith(u'/'), relpath_u)
330
331         d = defer.succeed(None)
332
333         def _maybe_upload(val, now=None):
334             if now is None:
335                 now = time.time()
336             fp = self._get_filepath(relpath_u)
337             pathinfo = get_pathinfo(unicode_from_filepath(fp))
338
339             self._log("about to remove %r from pending set %r" %
340                       (relpath_u, self._pending))
341             self._pending.remove(relpath_u)
342             encoded_path_u = magicpath.path2magic(relpath_u)
343
344             if not pathinfo.exists:
345                 # FIXME merge this with the 'isfile' case.
346                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
347                 self._count('objects_disappeared')
348
349                 db_entry = self._db.get_db_entry(relpath_u)
350                 if db_entry is None:
351                     return None
352
353                 last_downloaded_timestamp = now  # is this correct?
354
355                 if is_new_file(pathinfo, db_entry):
356                     new_version = db_entry.version + 1
357                 else:
358                     self._log("Not uploading %r" % (relpath_u,))
359                     self._count('objects_not_uploaded')
360                     return
361
362                 metadata = { 'version': new_version,
363                              'deleted': True,
364                              'last_downloaded_timestamp': last_downloaded_timestamp }
365                 if db_entry.last_downloaded_uri is not None:
366                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
367
368                 empty_uploadable = Data("", self._client.convergence)
369                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
370                                                    metadata=metadata, overwrite=True)
371
372                 def _add_db_entry(filenode):
373                     filecap = filenode.get_uri()
374                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
375                     self._db.did_upload_version(relpath_u, new_version, filecap,
376                                                 last_downloaded_uri, last_downloaded_timestamp,
377                                                 pathinfo)
378                     self._count('files_uploaded')
379                 d2.addCallback(_add_db_entry)
380                 return d2
381             elif pathinfo.islink:
382                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
383                 return None
384             elif pathinfo.isdir:
385                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
386                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
387
388                 uploadable = Data("", self._client.convergence)
389                 encoded_path_u += magicpath.path2magic(u"/")
390                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
391                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
392                 def _dir_succeeded(ign):
393                     self._log("created subdirectory %r" % (relpath_u,))
394                     self._count('directories_created')
395                 def _dir_failed(f):
396                     self._log("failed to create subdirectory %r" % (relpath_u,))
397                     return f
398                 upload_d.addCallbacks(_dir_succeeded, _dir_failed)
399                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
400                 upload_d.addCallback(lambda ign: self._extend_queue_and_keep_going(self._pending))
401                 return upload_d
402             elif pathinfo.isfile:
403                 db_entry = self._db.get_db_entry(relpath_u)
404
405                 last_downloaded_timestamp = now
406
407                 if db_entry is None:
408                     new_version = 0
409                 elif is_new_file(pathinfo, db_entry):
410                     new_version = db_entry.version + 1
411                 else:
412                     self._log("Not uploading %r" % (relpath_u,))
413                     self._count('objects_not_uploaded')
414                     return None
415
416                 metadata = { 'version': new_version,
417                              'last_downloaded_timestamp': last_downloaded_timestamp }
418                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
419                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
420
421                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
422                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
423                                                    metadata=metadata, overwrite=True)
424
425                 def _add_db_entry(filenode):
426                     filecap = filenode.get_uri()
427                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
428                     self._db.did_upload_version(relpath_u, new_version, filecap,
429                                                 last_downloaded_uri, last_downloaded_timestamp,
430                                                 pathinfo)
431                     self._count('files_uploaded')
432                 d2.addCallback(_add_db_entry)
433                 return d2
434             else:
435                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
436                 return None
437
438         d.addCallback(_maybe_upload)
439
440         def _succeeded(res):
441             self._count('objects_succeeded')
442             return res
443         def _failed(f):
444             self._count('objects_failed')
445             self._log("%s while processing %r" % (f, relpath_u))
446             return f
447         d.addCallbacks(_succeeded, _failed)
448         return d
449
450     def _get_metadata(self, encoded_path_u):
451         try:
452             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
453         except KeyError:
454             return Failure()
455         return d
456
457     def _get_filenode(self, encoded_path_u):
458         try:
459             d = self._upload_dirnode.get(encoded_path_u)
460         except KeyError:
461             return Failure()
462         return d
463
464
465 class WriteFileMixin(object):
466     FUDGE_SECONDS = 10.0
467
468     def _get_conflicted_filename(self, abspath_u):
469         return abspath_u + u".conflict"
470
471     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
472         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
473                   % (abspath_u, len(file_contents), is_conflict, now))
474
475         # 1. Write a temporary file, say .foo.tmp.
476         # 2. is_conflict determines whether this is an overwrite or a conflict.
477         # 3. Set the mtime of the replacement file to be T seconds before the
478         #    current local time.
479         # 4. Perform a file replacement with backup filename foo.backup,
480         #    replaced file foo, and replacement file .foo.tmp. If any step of
481         #    this operation fails, reclassify as a conflict and stop.
482         #
483         # Returns the path of the destination file.
484
485         precondition_abspath(abspath_u)
486         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
487         backup_path_u = abspath_u + u".backup"
488         if now is None:
489             now = time.time()
490
491         # ensure parent directory exists
492         head, tail = os.path.split(abspath_u)
493
494         old_mask = os.umask(self._umask)
495         try:
496             fileutil.make_dirs(head, (~ self._umask) & 0777)
497             fileutil.write(replacement_path_u, file_contents)
498         finally:
499             os.umask(old_mask)
500
501         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
502         if is_conflict:
503             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
504             return self._rename_conflicted_file(abspath_u, replacement_path_u)
505         else:
506             try:
507                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
508                 return abspath_u
509             except fileutil.ConflictError:
510                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
511
512     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
513         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
514
515         conflict_path_u = self._get_conflicted_filename(abspath_u)
516         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
517         if os.path.isfile(replacement_path_u):
518             print "%r exists" % (replacement_path_u,)
519         if os.path.isfile(conflict_path_u):
520             print "%r exists" % (conflict_path_u,)
521
522         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
523         return conflict_path_u
524
525     def _rename_deleted_file(self, abspath_u):
526         self._log('renaming deleted file to backup: %s' % (abspath_u,))
527         try:
528             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
529         except OSError:
530             self._log("Already gone: '%s'" % (abspath_u,))
531         return abspath_u
532
533
534 class Downloader(QueueMixin, WriteFileMixin):
535     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
536
537     def __init__(self, client, local_path_u, db, collective_dirnode,
538                  upload_readonly_dircap, clock, is_upload_pending, umask):
539         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
540
541         if not IDirectoryNode.providedBy(collective_dirnode):
542             raise AssertionError("The URI in '%s' does not refer to a directory."
543                                  % os.path.join('private', 'collective_dircap'))
544         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
545             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
546                                  % os.path.join('private', 'collective_dircap'))
547
548         self._collective_dirnode = collective_dirnode
549         self._upload_readonly_dircap = upload_readonly_dircap
550         self._is_upload_pending = is_upload_pending
551         self._umask = umask
552
553     def start_downloading(self):
554         self._log("start_downloading")
555         files = self._db.get_all_relpaths()
556         self._log("all files %s" % files)
557
558         d = self._scan_remote_collective(scan_self=True)
559         d.addBoth(self._logcb, "after _scan_remote_collective 0")
560         self._turn_deque()
561         return d
562
563     def stop(self):
564         self._stopped = True
565         d = defer.succeed(None)
566         d.addCallback(lambda ign: self._lazy_tail)
567         return d
568
569     def _should_download(self, relpath_u, remote_version):
570         """
571         _should_download returns a bool indicating whether or not a remote object should be downloaded.
572         We check the remote metadata version against our magic-folder db version number;
573         latest version wins.
574         """
575         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
576         if magicpath.should_ignore_file(relpath_u):
577             self._log("nope")
578             return False
579         self._log("yep")
580         db_entry = self._db.get_db_entry(relpath_u)
581         if db_entry is None:
582             return True
583         self._log("version %r" % (db_entry.version,))
584         return (db_entry.version < remote_version)
585
586     def _get_local_latest(self, relpath_u):
587         """
588         _get_local_latest takes a unicode path string checks to see if this file object
589         exists in our magic-folder db; if not then return None
590         else check for an entry in our magic-folder db and return the version number.
591         """
592         if not self._get_filepath(relpath_u).exists():
593             return None
594         db_entry = self._db.get_db_entry(relpath_u)
595         return None if db_entry is None else db_entry.version
596
597     def _get_collective_latest_file(self, filename):
598         """
599         _get_collective_latest_file takes a file path pointing to a file managed by
600         magic-folder and returns a deferred that fires with the two tuple containing a
601         file node and metadata for the latest version of the file located in the
602         magic-folder collective directory.
603         """
604         collective_dirmap_d = self._collective_dirnode.list()
605         def scan_collective(result):
606             list_of_deferreds = []
607             for dir_name in result.keys():
608                 # XXX make sure it's a directory
609                 d = defer.succeed(None)
610                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
611                 list_of_deferreds.append(d)
612             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
613             return deferList
614         collective_dirmap_d.addCallback(scan_collective)
615         def highest_version(deferredList):
616             max_version = 0
617             metadata = None
618             node = None
619             for success, result in deferredList:
620                 if success:
621                     if result[1]['version'] > max_version:
622                         node, metadata = result
623                         max_version = result[1]['version']
624             return node, metadata
625         collective_dirmap_d.addCallback(highest_version)
626         return collective_dirmap_d
627
628     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
629         self._log("_scan_remote_dmd nickname %r" % (nickname,))
630         d = dirnode.list()
631         def scan_listing(listing_map):
632             for encoded_relpath_u in listing_map.keys():
633                 relpath_u = magicpath.magic2path(encoded_relpath_u)
634                 self._log("found %r" % (relpath_u,))
635
636                 file_node, metadata = listing_map[encoded_relpath_u]
637                 local_version = self._get_local_latest(relpath_u)
638                 remote_version = metadata.get('version', None)
639                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
640
641                 if local_version is None or remote_version is None or local_version < remote_version:
642                     self._log("%r added to download queue" % (relpath_u,))
643                     if scan_batch.has_key(relpath_u):
644                         scan_batch[relpath_u] += [(file_node, metadata)]
645                     else:
646                         scan_batch[relpath_u] = [(file_node, metadata)]
647
648         d.addCallback(scan_listing)
649         d.addBoth(self._logcb, "end of _scan_remote_dmd")
650         return d
651
652     def _scan_remote_collective(self, scan_self=False):
653         self._log("_scan_remote_collective")
654         scan_batch = {}  # path -> [(filenode, metadata)]
655
656         d = self._collective_dirnode.list()
657         def scan_collective(dirmap):
658             d2 = defer.succeed(None)
659             for dir_name in dirmap:
660                 (dirnode, metadata) = dirmap[dir_name]
661                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
662                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
663                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
664                     def _err(f, dir_name=dir_name):
665                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
666                         # XXX what should we do to make this failure more visible to users?
667                     d2.addErrback(_err)
668
669             return d2
670         d.addCallback(scan_collective)
671
672         def _filter_batch_to_deque(ign):
673             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
674             for relpath_u in scan_batch.keys():
675                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
676
677                 if self._should_download(relpath_u, metadata['version']):
678                     self._deque.append( (relpath_u, file_node, metadata) )
679                 else:
680                     self._log("Excluding %r" % (relpath_u,))
681                     self._call_hook(None, 'processed')
682
683             self._log("deque after = %r" % (self._deque,))
684         d.addCallback(_filter_batch_to_deque)
685         return d
686
687     def _when_queue_is_empty(self):
688         d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
689         d.addBoth(self._logcb, "after _scan_remote_collective 1")
690         d.addCallback(lambda ign: self._turn_deque())
691         return d
692
693     def _process(self, item, now=None):
694         # Downloader
695         self._log("_process(%r)" % (item,))
696         if now is None:
697             now = time.time()
698         (relpath_u, file_node, metadata) = item
699         fp = self._get_filepath(relpath_u)
700         abspath_u = unicode_from_filepath(fp)
701         conflict_path_u = self._get_conflicted_filename(abspath_u)
702
703         d = defer.succeed(None)
704
705         def do_update_db(written_abspath_u):
706             filecap = file_node.get_uri()
707             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
708             last_downloaded_uri = filecap
709             last_downloaded_timestamp = now
710             written_pathinfo = get_pathinfo(written_abspath_u)
711
712             if not written_pathinfo.exists and not metadata.get('deleted', False):
713                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
714
715             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
716                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
717             self._count('objects_downloaded')
718         def failed(f):
719             self._log("download failed: %s" % (str(f),))
720             self._count('objects_failed')
721             return f
722
723         if os.path.isfile(conflict_path_u):
724             def fail(res):
725                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
726             d.addCallback(fail)
727         else:
728             is_conflict = False
729             db_entry = self._db.get_db_entry(relpath_u)
730             dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
731             dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
732             if db_entry:
733                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
734                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
735                         is_conflict = True
736                         self._count('objects_conflicted')
737                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
738                     is_conflict = True
739                     self._count('objects_conflicted')
740                 elif self._is_upload_pending(relpath_u):
741                     is_conflict = True
742                     self._count('objects_conflicted')
743
744             if relpath_u.endswith(u"/"):
745                 if metadata.get('deleted', False):
746                     self._log("rmdir(%r) ignored" % (abspath_u,))
747                 else:
748                     self._log("mkdir(%r)" % (abspath_u,))
749                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
750                     d.addCallback(lambda ign: abspath_u)
751             else:
752                 if metadata.get('deleted', False):
753                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
754                 else:
755                     d.addCallback(lambda ign: file_node.download_best_version())
756                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
757                                                                                is_conflict=is_conflict))
758
759         d.addCallbacks(do_update_db, failed)
760
761         def trap_conflicts(f):
762             f.trap(ConflictError)
763             return None
764         d.addErrback(trap_conflicts)
765         return d