]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
d25e9f743508f49a403488d728e51f7eedb45426
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24 defer.setDebugging(True)
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 def is_new_file(pathinfo, db_entry):
46     if db_entry is None:
47         return True
48
49     if not pathinfo.exists and db_entry.size is None:
50         return False
51
52     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
53             (db_entry.size, db_entry.ctime, db_entry.mtime))
54
55
56 class MagicFolder(service.MultiService):
57     name = 'magic-folder'
58
59     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
60                  pending_delay=1.0, clock=None):
61         precondition_abspath(local_path_u)
62
63         service.MultiService.__init__(self)
64
65         immediate = clock is not None
66         clock = clock or reactor
67         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
68         if db is None:
69             return Failure(Exception('ERROR: Unable to load magic folder db.'))
70
71         # for tests
72         self._client = client
73         self._db = db
74
75         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
76         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
77
78         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
79         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
80                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
81
82     def startService(self):
83         # TODO: why is this being called more than once?
84         if self.running:
85             return defer.succeed(None)
86         print "%r.startService" % (self,)
87         service.MultiService.startService(self)
88         return self.uploader.start_monitoring()
89
90     def ready(self):
91         """ready is used to signal us to start
92         processing the upload and download items...
93         """
94         self.uploader.start_uploading()  # synchronous
95         return self.downloader.start_downloading()
96
97     def finish(self):
98         print "finish"
99         d = self.uploader.stop()
100         d2 = self.downloader.stop()
101         d.addCallback(lambda ign: d2)
102         return d
103
104     def remove_service(self):
105         return service.MultiService.disownServiceParent(self)
106
107
108 class QueueMixin(HookMixin):
109     def __init__(self, client, local_path_u, db, name, clock):
110         self._client = client
111         self._local_path_u = local_path_u
112         self._local_filepath = to_filepath(local_path_u)
113         self._db = db
114         self._name = name
115         self._clock = clock
116         self._hooks = {'processed': None, 'started': None}
117         self.started_d = self.set_hook('started')
118
119         if not self._local_filepath.exists():
120             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
121                                  "but there is no directory at that location."
122                                  % quote_local_unicode_path(self._local_path_u))
123         if not self._local_filepath.isdir():
124             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
125                                  "but the thing at that location is not a directory."
126                                  % quote_local_unicode_path(self._local_path_u))
127
128         self._deque = deque()
129         self._lazy_tail = defer.succeed(None)
130         self._stopped = False
131         self._turn_delay = 0
132
133     def _get_filepath(self, relpath_u):
134         self._log("_get_filepath(%r)" % (relpath_u,))
135         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
136
137     def _get_relpath(self, filepath):
138         self._log("_get_relpath(%r)" % (filepath,))
139         segments = unicode_segments_from(filepath, self._local_filepath)
140         self._log("segments = %r" % (segments,))
141         return u"/".join(segments)
142
143     def _count(self, counter_name, delta=1):
144         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
145         self._log("%s += %r" % (counter_name, delta))
146         self._client.stats_provider.count(ctr, delta)
147
148     def _logcb(self, res, msg):
149         self._log("%s: %r" % (msg, res))
150         return res
151
152     def _log(self, msg):
153         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
154         self._client.log(s)
155         print s
156         #open("events", "ab+").write(msg)
157
158     def _turn_deque(self):
159         self._log("_turn_deque")
160         if self._stopped:
161             self._log("stopped")
162             return
163         try:
164             item = self._deque.pop()
165             self._log("popped %r" % (item,))
166             self._count('objects_queued', -1)
167         except IndexError:
168             self._log("deque is now empty")
169             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
170         else:
171             print "_turn_deque else clause"
172             def whawhat(result):
173                 print "result %r" % (result,)
174                 return result
175             self._lazy_tail.addBoth(whawhat)
176             self._lazy_tail.addCallback(lambda ign: self._process(item))
177             self._lazy_tail.addBoth(self._call_hook, 'processed')
178             self._lazy_tail.addErrback(log.err)
179             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
180
181
182 class Uploader(QueueMixin):
183     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
184                  immediate=False):
185         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
186
187         self.is_ready = False
188         self._immediate = immediate
189
190         if not IDirectoryNode.providedBy(upload_dirnode):
191             raise AssertionError("The URI in '%s' does not refer to a directory."
192                                  % os.path.join('private', 'magic_folder_dircap'))
193         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
194             raise AssertionError("The URI in '%s' is not a writecap to a directory."
195                                  % os.path.join('private', 'magic_folder_dircap'))
196
197         self._upload_dirnode = upload_dirnode
198         self._inotify = get_inotify_module()
199         self._notifier = self._inotify.INotify()
200         self._pending = set()  # of unicode relpaths
201
202         self._periodic_full_scan_duration = 10 * 60 # perform a full scan every 10 minutes
203
204         if hasattr(self._notifier, 'set_pending_delay'):
205             self._notifier.set_pending_delay(pending_delay)
206
207         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
208         #
209         self.mask = ( self._inotify.IN_CREATE
210                     | self._inotify.IN_CLOSE_WRITE
211                     | self._inotify.IN_MOVED_TO
212                     | self._inotify.IN_MOVED_FROM
213                     | self._inotify.IN_DELETE
214                     | self._inotify.IN_ONLYDIR
215                     | IN_EXCL_UNLINK
216                     )
217         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
218                              recursive=True)
219
220     def start_monitoring(self):
221         self._log("start_monitoring")
222         d = defer.succeed(None)
223         d.addCallback(lambda ign: self._notifier.startReading())
224         d.addCallback(lambda ign: self._count('dirs_monitored'))
225         d.addBoth(self._call_hook, 'started')
226         return d
227
228     def stop(self):
229         self._log("stop")
230         self._notifier.stopReading()
231         self._count('dirs_monitored', -1)
232         self.periodic_callid.cancel()
233         if hasattr(self._notifier, 'wait_until_stopped'):
234             d = self._notifier.wait_until_stopped()
235         else:
236             d = defer.succeed(None)
237         d.addCallback(lambda ign: self._lazy_tail)
238         return d
239
240     def start_uploading(self):
241         self._log("start_uploading")
242         self.is_ready = True
243
244         all_relpaths = self._db.get_all_relpaths()
245         self._log("all relpaths: %r" % (all_relpaths,))
246
247         for relpath_u in all_relpaths:
248             self._add_pending(relpath_u)
249
250         self._full_scan()
251
252     def _extend_queue_and_keep_going(self, relpaths_u):
253         self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
254         self._deque.extend(relpaths_u)
255         self._count('objects_queued', len(relpaths_u))
256
257         if self.is_ready:
258             if self._immediate:  # for tests
259                 self._turn_deque()
260             else:
261                 self._clock.callLater(0, self._turn_deque)
262
263     def _full_scan(self):
264         self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan)
265         print "FULL SCAN"
266         self._log("_pending %r" % (self._pending))
267         self._scan(u"")
268         self._extend_queue_and_keep_going(self._pending)
269
270     def _add_pending(self, relpath_u):
271         self._log("add pending %r" % (relpath_u,))        
272         if not magicpath.should_ignore_file(relpath_u):
273             self._pending.add(relpath_u)
274
275     def _scan(self, reldir_u):
276         # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
277         # Note that this doesn't add them to the deque -- that will
278
279         self._log("scan %r" % (reldir_u,))
280         fp = self._get_filepath(reldir_u)
281         try:
282             children = listdir_filepath(fp)
283         except EnvironmentError:
284             raise Exception("WARNING: magic folder: permission denied on directory %s"
285                             % quote_filepath(fp))
286         except FilenameEncodingError:
287             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
288                             % quote_filepath(fp))
289
290         for child in children:
291             _assert(isinstance(child, unicode), child=child)
292             self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
293
294     def is_pending(self, relpath_u):
295         return relpath_u in self._pending
296
297     def _notify(self, opaque, path, events_mask):
298         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
299         relpath_u = self._get_relpath(path)
300
301         # We filter out IN_CREATE events not associated with a directory.
302         # Acting on IN_CREATE for files could cause us to read and upload
303         # a possibly-incomplete file before the application has closed it.
304         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
305         # It isn't possible to avoid watching for IN_CREATE at all, because
306         # it is the only event notified for a directory creation.
307
308         if ((events_mask & self._inotify.IN_CREATE) != 0 and
309             (events_mask & self._inotify.IN_ISDIR) == 0):
310             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
311             return
312         if relpath_u in self._pending:
313             self._log("not queueing %r because it is already pending" % (relpath_u,))
314             return
315         if magicpath.should_ignore_file(relpath_u):
316             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
317             return
318
319         self._pending.add(relpath_u)
320         self._extend_queue_and_keep_going([relpath_u])
321
322     def _when_queue_is_empty(self):
323         return defer.succeed(None)
324
325     def _process(self, relpath_u):
326         # Uploader
327         self._log("_process(%r)" % (relpath_u,))
328         if relpath_u is None:
329             return
330         precondition(isinstance(relpath_u, unicode), relpath_u)
331         precondition(not relpath_u.endswith(u'/'), relpath_u)
332
333         d = defer.succeed(None)
334
335         def _maybe_upload(val, now=None):
336             if now is None:
337                 now = time.time()
338             fp = self._get_filepath(relpath_u)
339             pathinfo = get_pathinfo(unicode_from_filepath(fp))
340
341             self._log("about to remove %r from pending set %r" %
342                       (relpath_u, self._pending))
343             self._pending.remove(relpath_u)
344             encoded_path_u = magicpath.path2magic(relpath_u)
345
346             if not pathinfo.exists:
347                 # FIXME merge this with the 'isfile' case.
348                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
349                 self._count('objects_disappeared')
350
351                 db_entry = self._db.get_db_entry(relpath_u)
352                 if db_entry is None:
353                     return None
354
355                 last_downloaded_timestamp = now  # is this correct?
356
357                 if is_new_file(pathinfo, db_entry):
358                     new_version = db_entry.version + 1
359                 else:
360                     self._log("Not uploading %r" % (relpath_u,))
361                     self._count('objects_not_uploaded')
362                     return
363
364                 metadata = { 'version': new_version,
365                              'deleted': True,
366                              'last_downloaded_timestamp': last_downloaded_timestamp }
367                 if db_entry.last_downloaded_uri is not None:
368                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
369
370                 empty_uploadable = Data("", self._client.convergence)
371                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
372                                                    metadata=metadata, overwrite=True)
373
374                 def _add_db_entry(filenode):
375                     filecap = filenode.get_uri()
376                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
377                     self._db.did_upload_version(relpath_u, new_version, filecap,
378                                                 last_downloaded_uri, last_downloaded_timestamp,
379                                                 pathinfo)
380                     self._count('files_uploaded')
381                 d2.addCallback(_add_db_entry)
382                 return d2
383             elif pathinfo.islink:
384                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
385                 return None
386             elif pathinfo.isdir:
387                 print "ISDIR "
388                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
389                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
390
391                 uploadable = Data("", self._client.convergence)
392                 encoded_path_u += magicpath.path2magic(u"/")
393                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
394                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
395                 def _dir_succeeded(ign):
396                     self._log("created subdirectory %r" % (relpath_u,))
397                     self._count('directories_created')
398                 def _dir_failed(f):
399                     self._log("failed to create subdirectory %r" % (relpath_u,))
400                     return f
401                 upload_d.addCallbacks(_dir_succeeded, _dir_failed)
402                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
403                 upload_d.addCallback(lambda ign: self._extend_queue_and_keep_going(self._pending))
404                 return upload_d
405             elif pathinfo.isfile:
406                 db_entry = self._db.get_db_entry(relpath_u)
407
408                 last_downloaded_timestamp = now
409
410                 if db_entry is None:
411                     new_version = 0
412                 elif is_new_file(pathinfo, db_entry):
413                     new_version = db_entry.version + 1
414                 else:
415                     self._log("Not uploading %r" % (relpath_u,))
416                     self._count('objects_not_uploaded')
417                     return None
418
419                 metadata = { 'version': new_version,
420                              'last_downloaded_timestamp': last_downloaded_timestamp }
421                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
422                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
423
424                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
425                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
426                                                    metadata=metadata, overwrite=True)
427
428                 def _add_db_entry(filenode):
429                     filecap = filenode.get_uri()
430                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
431                     self._db.did_upload_version(relpath_u, new_version, filecap,
432                                                 last_downloaded_uri, last_downloaded_timestamp,
433                                                 pathinfo)
434                     self._count('files_uploaded')
435                 d2.addCallback(_add_db_entry)
436                 return d2
437             else:
438                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
439                 return None
440
441         d.addCallback(_maybe_upload)
442
443         def _succeeded(res):
444             self._count('objects_succeeded')
445             return res
446         def _failed(f):
447             self._count('objects_failed')
448             self._log("%s while processing %r" % (f, relpath_u))
449             return f
450         d.addCallbacks(_succeeded, _failed)
451         return d
452
453     def _get_metadata(self, encoded_path_u):
454         try:
455             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
456         except KeyError:
457             return Failure()
458         return d
459
460     def _get_filenode(self, encoded_path_u):
461         try:
462             d = self._upload_dirnode.get(encoded_path_u)
463         except KeyError:
464             return Failure()
465         return d
466
467
468 class WriteFileMixin(object):
469     FUDGE_SECONDS = 10.0
470
471     def _get_conflicted_filename(self, abspath_u):
472         return abspath_u + u".conflict"
473
474     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
475         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
476                   % (abspath_u, len(file_contents), is_conflict, now))
477
478         # 1. Write a temporary file, say .foo.tmp.
479         # 2. is_conflict determines whether this is an overwrite or a conflict.
480         # 3. Set the mtime of the replacement file to be T seconds before the
481         #    current local time.
482         # 4. Perform a file replacement with backup filename foo.backup,
483         #    replaced file foo, and replacement file .foo.tmp. If any step of
484         #    this operation fails, reclassify as a conflict and stop.
485         #
486         # Returns the path of the destination file.
487
488         precondition_abspath(abspath_u)
489         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
490         backup_path_u = abspath_u + u".backup"
491         if now is None:
492             now = time.time()
493
494         # ensure parent directory exists
495         head, tail = os.path.split(abspath_u)
496
497         old_mask = os.umask(self._umask)
498         try:
499             fileutil.make_dirs(head, (~ self._umask) & 0777)
500             fileutil.write(replacement_path_u, file_contents)
501         finally:
502             os.umask(old_mask)
503
504         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
505         if is_conflict:
506             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
507             return self._rename_conflicted_file(abspath_u, replacement_path_u)
508         else:
509             try:
510                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
511                 return abspath_u
512             except fileutil.ConflictError:
513                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
514
515     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
516         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
517
518         conflict_path_u = self._get_conflicted_filename(abspath_u)
519         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
520         if os.path.isfile(replacement_path_u):
521             print "%r exists" % (replacement_path_u,)
522         if os.path.isfile(conflict_path_u):
523             print "%r exists" % (conflict_path_u,)
524
525         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
526         return conflict_path_u
527
528     def _rename_deleted_file(self, abspath_u):
529         self._log('renaming deleted file to backup: %s' % (abspath_u,))
530         try:
531             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
532         except OSError:
533             self._log("Already gone: '%s'" % (abspath_u,))
534         return abspath_u
535
536
537 class Downloader(QueueMixin, WriteFileMixin):
538     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
539
540     def __init__(self, client, local_path_u, db, collective_dirnode,
541                  upload_readonly_dircap, clock, is_upload_pending, umask):
542         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
543
544         if not IDirectoryNode.providedBy(collective_dirnode):
545             raise AssertionError("The URI in '%s' does not refer to a directory."
546                                  % os.path.join('private', 'collective_dircap'))
547         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
548             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
549                                  % os.path.join('private', 'collective_dircap'))
550
551         self._collective_dirnode = collective_dirnode
552         self._upload_readonly_dircap = upload_readonly_dircap
553         self._is_upload_pending = is_upload_pending
554         self._umask = umask
555
556     def start_downloading(self):
557         self._log("start_downloading")
558         files = self._db.get_all_relpaths()
559         self._log("all files %s" % files)
560
561         d = self._scan_remote_collective(scan_self=True)
562         d.addBoth(self._logcb, "after _scan_remote_collective 0")
563         self._turn_deque()
564         return d
565
566     def stop(self):
567         self._stopped = True
568         d = defer.succeed(None)
569         d.addCallback(lambda ign: self._lazy_tail)
570         return d
571
572     def _should_download(self, relpath_u, remote_version):
573         """
574         _should_download returns a bool indicating whether or not a remote object should be downloaded.
575         We check the remote metadata version against our magic-folder db version number;
576         latest version wins.
577         """
578         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
579         if magicpath.should_ignore_file(relpath_u):
580             self._log("nope")
581             return False
582         self._log("yep")
583         db_entry = self._db.get_db_entry(relpath_u)
584         if db_entry is None:
585             return True
586         self._log("version %r" % (db_entry.version,))
587         return (db_entry.version < remote_version)
588
589     def _get_local_latest(self, relpath_u):
590         """
591         _get_local_latest takes a unicode path string checks to see if this file object
592         exists in our magic-folder db; if not then return None
593         else check for an entry in our magic-folder db and return the version number.
594         """
595         if not self._get_filepath(relpath_u).exists():
596             return None
597         db_entry = self._db.get_db_entry(relpath_u)
598         return None if db_entry is None else db_entry.version
599
600     def _get_collective_latest_file(self, filename):
601         """
602         _get_collective_latest_file takes a file path pointing to a file managed by
603         magic-folder and returns a deferred that fires with the two tuple containing a
604         file node and metadata for the latest version of the file located in the
605         magic-folder collective directory.
606         """
607         collective_dirmap_d = self._collective_dirnode.list()
608         def scan_collective(result):
609             list_of_deferreds = []
610             for dir_name in result.keys():
611                 # XXX make sure it's a directory
612                 d = defer.succeed(None)
613                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
614                 list_of_deferreds.append(d)
615             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
616             return deferList
617         collective_dirmap_d.addCallback(scan_collective)
618         def highest_version(deferredList):
619             max_version = 0
620             metadata = None
621             node = None
622             for success, result in deferredList:
623                 if success:
624                     if result[1]['version'] > max_version:
625                         node, metadata = result
626                         max_version = result[1]['version']
627             return node, metadata
628         collective_dirmap_d.addCallback(highest_version)
629         return collective_dirmap_d
630
631     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
632         self._log("_scan_remote_dmd nickname %r" % (nickname,))
633         d = dirnode.list()
634         def scan_listing(listing_map):
635             for encoded_relpath_u in listing_map.keys():
636                 relpath_u = magicpath.magic2path(encoded_relpath_u)
637                 self._log("found %r" % (relpath_u,))
638
639                 file_node, metadata = listing_map[encoded_relpath_u]
640                 local_version = self._get_local_latest(relpath_u)
641                 remote_version = metadata.get('version', None)
642                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
643
644                 if local_version is None or remote_version is None or local_version < remote_version:
645                     self._log("%r added to download queue" % (relpath_u,))
646                     if scan_batch.has_key(relpath_u):
647                         scan_batch[relpath_u] += [(file_node, metadata)]
648                     else:
649                         scan_batch[relpath_u] = [(file_node, metadata)]
650
651         d.addCallback(scan_listing)
652         d.addBoth(self._logcb, "end of _scan_remote_dmd")
653         return d
654
655     def _scan_remote_collective(self, scan_self=False):
656         self._log("_scan_remote_collective")
657         scan_batch = {}  # path -> [(filenode, metadata)]
658
659         d = self._collective_dirnode.list()
660         def scan_collective(dirmap):
661             d2 = defer.succeed(None)
662             for dir_name in dirmap:
663                 (dirnode, metadata) = dirmap[dir_name]
664                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
665                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
666                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
667                     def _err(f, dir_name=dir_name):
668                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
669                         # XXX what should we do to make this failure more visible to users?
670                     d2.addErrback(_err)
671
672             return d2
673         d.addCallback(scan_collective)
674
675         def _filter_batch_to_deque(ign):
676             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
677             for relpath_u in scan_batch.keys():
678                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
679
680                 if self._should_download(relpath_u, metadata['version']):
681                     self._deque.append( (relpath_u, file_node, metadata) )
682                 else:
683                     self._log("Excluding %r" % (relpath_u,))
684                     self._call_hook(None, 'processed')
685
686             self._log("deque after = %r" % (self._deque,))
687         d.addCallback(_filter_batch_to_deque)
688         return d
689
690     def _when_queue_is_empty(self):
691         d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
692         d.addBoth(self._logcb, "after _scan_remote_collective 1")
693         d.addCallback(lambda ign: self._turn_deque())
694         return d
695
696     def _process(self, item, now=None):
697         # Downloader
698         self._log("_process(%r)" % (item,))
699         if now is None:
700             now = time.time()
701         (relpath_u, file_node, metadata) = item
702         fp = self._get_filepath(relpath_u)
703         abspath_u = unicode_from_filepath(fp)
704         conflict_path_u = self._get_conflicted_filename(abspath_u)
705
706         d = defer.succeed(None)
707
708         def do_update_db(written_abspath_u):
709             filecap = file_node.get_uri()
710             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
711             last_downloaded_uri = filecap
712             last_downloaded_timestamp = now
713             written_pathinfo = get_pathinfo(written_abspath_u)
714
715             if not written_pathinfo.exists and not metadata.get('deleted', False):
716                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
717
718             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
719                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
720             self._count('objects_downloaded')
721         def failed(f):
722             self._log("download failed: %s" % (str(f),))
723             self._count('objects_failed')
724             return f
725
726         if os.path.isfile(conflict_path_u):
727             def fail(res):
728                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
729             d.addCallback(fail)
730         else:
731             is_conflict = False
732             db_entry = self._db.get_db_entry(relpath_u)
733             dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
734             dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
735             if db_entry:
736                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
737                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
738                         is_conflict = True
739                         self._count('objects_conflicted')
740                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
741                     is_conflict = True
742                     self._count('objects_conflicted')
743                 elif self._is_upload_pending(relpath_u):
744                     is_conflict = True
745                     self._count('objects_conflicted')
746
747             if relpath_u.endswith(u"/"):
748                 if metadata.get('deleted', False):
749                     self._log("rmdir(%r) ignored" % (abspath_u,))
750                 else:
751                     self._log("mkdir(%r)" % (abspath_u,))
752                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
753                     d.addCallback(lambda ign: abspath_u)
754             else:
755                 if metadata.get('deleted', False):
756                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
757                 else:
758                     d.addCallback(lambda ign: file_node.download_best_version())
759                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
760                                                                                is_conflict=is_conflict))
761
762         d.addCallbacks(do_update_db, failed)
763
764         def trap_conflicts(f):
765             f.trap(ConflictError)
766             return None
767         d.addErrback(trap_conflicts)
768         return d