]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
224b60976865be39b90101075eccadb4dffd0266
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 def is_new_file(pathinfo, db_entry):
46     if db_entry is None:
47         return True
48
49     if not pathinfo.exists and db_entry.size is None:
50         print("NOT because", pathinfo.exists, db_entry.size)
51         return False
52
53     print("NOT because", pathinfo.size, pathinfo.ctime, pathinfo.mtime,
54           db_entry.size, db_entry.ctime, db_entry.mtime,
55           ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
56            (db_entry.size, db_entry.ctime, db_entry.mtime)))
57     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
58             (db_entry.size, db_entry.ctime, db_entry.mtime))
59
60
61 class MagicFolder(service.MultiService):
62     name = 'magic-folder'
63
64     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
65                  pending_delay=1.0, clock=None):
66         precondition_abspath(local_path_u)
67
68         service.MultiService.__init__(self)
69
70         immediate = clock is not None
71         clock = clock or reactor
72         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
73         if db is None:
74             return Failure(Exception('ERROR: Unable to load magic folder db.'))
75
76         # for tests
77         self._client = client
78         self._db = db
79
80         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
81         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
82
83         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
84         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
85                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
86
87     def startService(self):
88         # TODO: why is this being called more than once?
89         if self.running:
90             return defer.succeed(None)
91         print "%r.startService" % (self,)
92         service.MultiService.startService(self)
93         return self.uploader.start_monitoring()
94
95     def ready(self):
96         """ready is used to signal us to start
97         processing the upload and download items...
98         """
99         d = self.uploader.start_scanning()
100         d2 = self.downloader.start_scanning()
101         d.addCallback(lambda ign: d2)
102         return d
103
104     def finish(self):
105         print "finish"
106         d = self.uploader.stop()
107         d2 = self.downloader.stop()
108         d.addCallback(lambda ign: d2)
109         return d
110
111     def remove_service(self):
112         return service.MultiService.disownServiceParent(self)
113
114
115 class QueueMixin(HookMixin):
116     def __init__(self, client, local_path_u, db, name, clock):
117         self._client = client
118         self._local_path_u = local_path_u
119         self._local_filepath = to_filepath(local_path_u)
120         self._db = db
121         self._name = name
122         self._clock = clock
123         self._hooks = {'processed': None, 'started': None}
124         self.started_d = self.set_hook('started')
125
126         if not self._local_filepath.exists():
127             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
128                                  "but there is no directory at that location."
129                                  % quote_local_unicode_path(self._local_path_u))
130         if not self._local_filepath.isdir():
131             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
132                                  "but the thing at that location is not a directory."
133                                  % quote_local_unicode_path(self._local_path_u))
134
135         self._deque = deque()
136         self._lazy_tail = defer.succeed(None)
137         self._stopped = False
138         self._turn_delay = 0
139
140     def _get_filepath(self, relpath_u):
141         self._log("_get_filepath(%r)" % (relpath_u,))
142         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
143
144     def _get_relpath(self, filepath):
145         self._log("_get_relpath(%r)" % (filepath,))
146         segments = unicode_segments_from(filepath, self._local_filepath)
147         self._log("segments = %r" % (segments,))
148         return u"/".join(segments)
149
150     def _count(self, counter_name, delta=1):
151         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
152         self._log("%s += %r" % (counter_name, delta))
153         self._client.stats_provider.count(ctr, delta)
154
155     def _logcb(self, res, msg):
156         self._log("%s: %r" % (msg, res))
157         return res
158
159     def _log(self, msg):
160         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
161         self._client.log(s)
162         print s
163         #open("events", "ab+").write(msg)
164
165     def _turn_deque(self):
166         self._log("_turn_deque")
167         if self._stopped:
168             self._log("stopped")
169             return
170         try:
171             item = self._deque.pop()
172             self._log("popped %r" % (item,))
173             self._count('objects_queued', -1)
174         except IndexError:
175             self._log("deque is now empty")
176             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
177         else:
178             self._lazy_tail.addCallback(lambda ign: self._process(item))
179             self._lazy_tail.addBoth(self._call_hook, 'processed')
180             self._lazy_tail.addErrback(log.err)
181             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
182
183
184 class Uploader(QueueMixin):
185     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
186                  immediate=False):
187         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
188
189         self.is_ready = False
190         self._immediate = immediate
191
192         if not IDirectoryNode.providedBy(upload_dirnode):
193             raise AssertionError("The URI in '%s' does not refer to a directory."
194                                  % os.path.join('private', 'magic_folder_dircap'))
195         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
196             raise AssertionError("The URI in '%s' is not a writecap to a directory."
197                                  % os.path.join('private', 'magic_folder_dircap'))
198
199         self._upload_dirnode = upload_dirnode
200         self._inotify = get_inotify_module()
201         self._notifier = self._inotify.INotify()
202         self._pending = set()
203
204         if hasattr(self._notifier, 'set_pending_delay'):
205             self._notifier.set_pending_delay(pending_delay)
206
207         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
208         #
209         self.mask = ( self._inotify.IN_CREATE
210                     | self._inotify.IN_CLOSE_WRITE
211                     | self._inotify.IN_MOVED_TO
212                     | self._inotify.IN_MOVED_FROM
213                     | self._inotify.IN_DELETE
214                     | self._inotify.IN_ONLYDIR
215                     | IN_EXCL_UNLINK
216                     )
217         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
218                              recursive=False)#True)
219         print "WATCHING", self._local_filepath
220
221     def start_monitoring(self):
222         self._log("start_monitoring")
223         d = defer.succeed(None)
224         d.addCallback(lambda ign: self._notifier.startReading())
225         d.addCallback(lambda ign: self._count('dirs_monitored'))
226         d.addBoth(self._call_hook, 'started')
227         return d
228
229     def stop(self):
230         self._log("stop")
231         self._notifier.stopReading()
232         self._count('dirs_monitored', -1)
233         if hasattr(self._notifier, 'wait_until_stopped'):
234             d = self._notifier.wait_until_stopped()
235         else:
236             d = defer.succeed(None)
237         d.addCallback(lambda ign: self._lazy_tail)
238         return d
239
240     def start_scanning(self):
241         self._log("start_scanning")
242         self.is_ready = True
243         self._pending = self._db.get_all_relpaths()
244         self._log("all_files %r" % (self._pending))
245         d = self._scan(u"")
246         def _add_pending(ign):
247             # This adds all of the files that were in the db but not already processed
248             # (normally because they have been deleted on disk).
249             self._log("adding %r" % (self._pending))
250             self._deque.extend(self._pending)
251         d.addCallback(_add_pending)
252         d.addCallback(lambda ign: self._turn_deque())
253         return d
254
255     def _scan(self, reldir_u):
256         self._log("scan %r" % (reldir_u,))
257         fp = self._get_filepath(reldir_u)
258         try:
259             children = listdir_filepath(fp)
260         except EnvironmentError:
261             raise Exception("WARNING: magic folder: permission denied on directory %s"
262                             % quote_filepath(fp))
263         except FilenameEncodingError:
264             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
265                             % quote_filepath(fp))
266
267         d = defer.succeed(None)
268         for child in children:
269             _assert(isinstance(child, unicode), child=child)
270             d.addCallback(lambda ign, child=child:
271                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
272             def _add_pending(relpath_u):
273                 if magicpath.should_ignore_file(relpath_u):
274                     return None
275
276                 self._pending.add(relpath_u)
277                 return relpath_u
278             d.addCallback(_add_pending)
279             # This call to _process doesn't go through the deque, and probably should.
280             d.addCallback(self._process)
281             d.addBoth(self._call_hook, 'processed')
282             d.addErrback(log.err)
283
284         return d
285
286     def is_pending(self, relpath_u):
287         return relpath_u in self._pending
288
289     def _notify(self, opaque, path, events_mask):
290         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
291         relpath_u = self._get_relpath(path)
292
293         # We filter out IN_CREATE events not associated with a directory.
294         # Acting on IN_CREATE for files could cause us to read and upload
295         # a possibly-incomplete file before the application has closed it.
296         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
297         # It isn't possible to avoid watching for IN_CREATE at all, because
298         # it is the only event notified for a directory creation.
299
300         if ((events_mask & self._inotify.IN_CREATE) != 0 and
301             (events_mask & self._inotify.IN_ISDIR) == 0):
302             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
303             return
304         if relpath_u in self._pending:
305             self._log("ignoring event for %r (already pending)" % (relpath_u,))
306             return
307         if magicpath.should_ignore_file(relpath_u):
308             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
309             return
310
311         self._log("appending %r to deque" % (relpath_u,))
312         self._deque.append(relpath_u)
313         self._pending.add(relpath_u)
314         self._count('objects_queued')
315         if self.is_ready:
316             if self._immediate:  # for tests
317                 self._turn_deque()
318             else:
319                 self._clock.callLater(0, self._turn_deque)
320
321     def _when_queue_is_empty(self):
322         return defer.succeed(None)
323
324     def _process(self, relpath_u):
325         # Uploader
326         self._log("_process(%r)" % (relpath_u,))
327         if relpath_u is None:
328             return
329         precondition(isinstance(relpath_u, unicode), relpath_u)
330         precondition(not relpath_u.endswith(u'/'), relpath_u)
331
332         d = defer.succeed(None)
333
334         def _maybe_upload(val, now=None):
335             if now is None:
336                 now = time.time()
337             fp = self._get_filepath(relpath_u)
338             pathinfo = get_pathinfo(unicode_from_filepath(fp))
339
340             self._log("about to remove %r from pending set %r" %
341                       (relpath_u, self._pending))
342             self._pending.remove(relpath_u)
343             encoded_path_u = magicpath.path2magic(relpath_u)
344
345             if not pathinfo.exists:
346                 # FIXME merge this with the 'isfile' case.
347                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
348                 self._count('objects_disappeared')
349
350                 db_entry = self._db.get_db_entry(relpath_u)
351                 if db_entry is None:
352                     return None
353
354                 last_downloaded_timestamp = now  # is this correct?
355
356                 if is_new_file(pathinfo, db_entry):
357                     new_version = db_entry.version + 1
358                 else:
359                     self._log("Not uploading %r" % (relpath_u,))
360                     self._count('objects_not_uploaded')
361                     return
362
363                 metadata = { 'version': new_version,
364                              'deleted': True,
365                              'last_downloaded_timestamp': last_downloaded_timestamp }
366                 if db_entry.last_downloaded_uri is not None:
367                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
368
369                 empty_uploadable = Data("", self._client.convergence)
370                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
371                                                    metadata=metadata, overwrite=True)
372
373                 def _add_db_entry(filenode):
374                     filecap = filenode.get_uri()
375                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
376                     self._db.did_upload_version(relpath_u, new_version, filecap,
377                                                 last_downloaded_uri, last_downloaded_timestamp,
378                                                 pathinfo)
379                     self._count('files_uploaded')
380                 d2.addCallback(_add_db_entry)
381                 return d2
382             elif pathinfo.islink:
383                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
384                 return None
385             elif pathinfo.isdir:
386                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
387                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
388
389                 uploadable = Data("", self._client.convergence)
390                 encoded_path_u += magicpath.path2magic(u"/")
391                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
392                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
393                 def _succeeded(ign):
394                     self._log("created subdirectory %r" % (relpath_u,))
395                     self._count('directories_created')
396                 def _failed(f):
397                     self._log("failed to create subdirectory %r" % (relpath_u,))
398                     return f
399                 upload_d.addCallbacks(_succeeded, _failed)
400                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
401                 return upload_d
402             elif pathinfo.isfile:
403                 db_entry = self._db.get_db_entry(relpath_u)
404
405                 last_downloaded_timestamp = now
406
407                 if db_entry is None:
408                     new_version = 0
409                 elif is_new_file(pathinfo, db_entry):
410                     new_version = db_entry.version + 1
411                 else:
412                     self._log("Not uploading %r" % (relpath_u,))
413                     self._count('objects_not_uploaded')
414                     return None
415
416                 metadata = { 'version': new_version,
417                              'last_downloaded_timestamp': last_downloaded_timestamp }
418                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
419                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
420
421                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
422                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
423                                                    metadata=metadata, overwrite=True)
424
425                 def _add_db_entry(filenode):
426                     filecap = filenode.get_uri()
427                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
428                     self._db.did_upload_version(relpath_u, new_version, filecap,
429                                                 last_downloaded_uri, last_downloaded_timestamp,
430                                                 pathinfo)
431                     self._count('files_uploaded')
432                 d2.addCallback(_add_db_entry)
433                 return d2
434             else:
435                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
436                 return None
437
438         d.addCallback(_maybe_upload)
439
440         def _succeeded(res):
441             self._count('objects_succeeded')
442             return res
443         def _failed(f):
444             self._count('objects_failed')
445             self._log("%s while processing %r" % (f, relpath_u))
446             return f
447         d.addCallbacks(_succeeded, _failed)
448         return d
449
450     def _get_metadata(self, encoded_path_u):
451         try:
452             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
453         except KeyError:
454             return Failure()
455         return d
456
457     def _get_filenode(self, encoded_path_u):
458         try:
459             d = self._upload_dirnode.get(encoded_path_u)
460         except KeyError:
461             return Failure()
462         return d
463
464
465 class WriteFileMixin(object):
466     FUDGE_SECONDS = 10.0
467
468     def _get_conflicted_filename(self, abspath_u):
469         return abspath_u + u".conflict"
470
471     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
472         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
473                   % (abspath_u, len(file_contents), is_conflict, now))
474
475         # 1. Write a temporary file, say .foo.tmp.
476         # 2. is_conflict determines whether this is an overwrite or a conflict.
477         # 3. Set the mtime of the replacement file to be T seconds before the
478         #    current local time.
479         # 4. Perform a file replacement with backup filename foo.backup,
480         #    replaced file foo, and replacement file .foo.tmp. If any step of
481         #    this operation fails, reclassify as a conflict and stop.
482         #
483         # Returns the path of the destination file.
484
485         precondition_abspath(abspath_u)
486         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
487         backup_path_u = abspath_u + u".backup"
488         if now is None:
489             now = time.time()
490
491         # ensure parent directory exists
492         head, tail = os.path.split(abspath_u)
493
494         old_mask = os.umask(self._umask)
495         try:
496             fileutil.make_dirs(head, (~ self._umask) & 0777)
497             fileutil.write(replacement_path_u, file_contents)
498         finally:
499             os.umask(old_mask)
500
501         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
502         if is_conflict:
503             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
504             return self._rename_conflicted_file(abspath_u, replacement_path_u)
505         else:
506             try:
507                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
508                 return abspath_u
509             except fileutil.ConflictError:
510                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
511
512     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
513         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
514
515         conflict_path_u = self._get_conflicted_filename(abspath_u)
516         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
517         if os.path.isfile(replacement_path_u):
518             print "%r exists" % (replacement_path_u,)
519         if os.path.isfile(conflict_path_u):
520             print "%r exists" % (conflict_path_u,)
521
522         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
523         return conflict_path_u
524
525     def _rename_deleted_file(self, abspath_u):
526         self._log('renaming deleted file to backup: %s' % (abspath_u,))
527         try:
528             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
529         except OSError:
530             self._log("Already gone: '%s'" % (abspath_u,))
531         return abspath_u
532
533
534 class Downloader(QueueMixin, WriteFileMixin):
535     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
536
537     def __init__(self, client, local_path_u, db, collective_dirnode,
538                  upload_readonly_dircap, clock, is_upload_pending, umask):
539         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
540
541         if not IDirectoryNode.providedBy(collective_dirnode):
542             raise AssertionError("The URI in '%s' does not refer to a directory."
543                                  % os.path.join('private', 'collective_dircap'))
544         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
545             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
546                                  % os.path.join('private', 'collective_dircap'))
547
548         self._collective_dirnode = collective_dirnode
549         self._upload_readonly_dircap = upload_readonly_dircap
550         self._is_upload_pending = is_upload_pending
551         self._umask = umask
552
553     def start_scanning(self):
554         self._log("start_scanning")
555         files = self._db.get_all_relpaths()
556         self._log("all files %s" % files)
557
558         d = self._scan_remote_collective(scan_self=True)
559         d.addBoth(self._logcb, "after _scan_remote_collective 0")
560         self._turn_deque()
561         return d
562
563     def stop(self):
564         self._stopped = True
565         d = defer.succeed(None)
566         d.addCallback(lambda ign: self._lazy_tail)
567         return d
568
569     def _should_download(self, relpath_u, remote_version):
570         """
571         _should_download returns a bool indicating whether or not a remote object should be downloaded.
572         We check the remote metadata version against our magic-folder db version number;
573         latest version wins.
574         """
575         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
576         if magicpath.should_ignore_file(relpath_u):
577             self._log("nope")
578             return False
579         self._log("yep")
580         db_entry = self._db.get_db_entry(relpath_u)
581         if db_entry is None:
582             return True
583         self._log("version %r" % (db_entry.version,))
584         return (db_entry.version < remote_version)
585
586     def _get_local_latest(self, relpath_u):
587         """
588         _get_local_latest takes a unicode path string checks to see if this file object
589         exists in our magic-folder db; if not then return None
590         else check for an entry in our magic-folder db and return the version number.
591         """
592         if not self._get_filepath(relpath_u).exists():
593             return None
594         db_entry = self._db.get_db_entry(relpath_u)
595         return None if db_entry is None else db_entry.version
596
597     def _get_collective_latest_file(self, filename):
598         """
599         _get_collective_latest_file takes a file path pointing to a file managed by
600         magic-folder and returns a deferred that fires with the two tuple containing a
601         file node and metadata for the latest version of the file located in the
602         magic-folder collective directory.
603         """
604         collective_dirmap_d = self._collective_dirnode.list()
605         def scan_collective(result):
606             list_of_deferreds = []
607             for dir_name in result.keys():
608                 # XXX make sure it's a directory
609                 d = defer.succeed(None)
610                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
611                 list_of_deferreds.append(d)
612             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
613             return deferList
614         collective_dirmap_d.addCallback(scan_collective)
615         def highest_version(deferredList):
616             max_version = 0
617             metadata = None
618             node = None
619             for success, result in deferredList:
620                 if success:
621                     if result[1]['version'] > max_version:
622                         node, metadata = result
623                         max_version = result[1]['version']
624             return node, metadata
625         collective_dirmap_d.addCallback(highest_version)
626         return collective_dirmap_d
627
628     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
629         self._log("_scan_remote_dmd nickname %r" % (nickname,))
630         d = dirnode.list()
631         def scan_listing(listing_map):
632             for encoded_relpath_u in listing_map.keys():
633                 relpath_u = magicpath.magic2path(encoded_relpath_u)
634                 self._log("found %r" % (relpath_u,))
635
636                 file_node, metadata = listing_map[encoded_relpath_u]
637                 local_version = self._get_local_latest(relpath_u)
638                 remote_version = metadata.get('version', None)
639                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
640
641                 if local_version is None or remote_version is None or local_version < remote_version:
642                     self._log("%r added to download queue" % (relpath_u,))
643                     if scan_batch.has_key(relpath_u):
644                         scan_batch[relpath_u] += [(file_node, metadata)]
645                     else:
646                         scan_batch[relpath_u] = [(file_node, metadata)]
647
648         d.addCallback(scan_listing)
649         d.addBoth(self._logcb, "end of _scan_remote_dmd")
650         return d
651
652     def _scan_remote_collective(self, scan_self=False):
653         self._log("_scan_remote_collective")
654         scan_batch = {}  # path -> [(filenode, metadata)]
655
656         d = self._collective_dirnode.list()
657         def scan_collective(dirmap):
658             d2 = defer.succeed(None)
659             for dir_name in dirmap:
660                 (dirnode, metadata) = dirmap[dir_name]
661                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
662                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
663                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
664                     def _err(f, dir_name=dir_name):
665                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
666                         # XXX what should we do to make this failure more visible to users?
667                     d2.addErrback(_err)
668
669             return d2
670         d.addCallback(scan_collective)
671
672         def _filter_batch_to_deque(ign):
673             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
674             for relpath_u in scan_batch.keys():
675                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
676
677                 if self._should_download(relpath_u, metadata['version']):
678                     self._deque.append( (relpath_u, file_node, metadata) )
679                 else:
680                     self._log("Excluding %r" % (relpath_u,))
681                     self._call_hook(None, 'processed')
682
683             self._log("deque after = %r" % (self._deque,))
684         d.addCallback(_filter_batch_to_deque)
685         return d
686
687     def _when_queue_is_empty(self):
688         d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
689         d.addBoth(self._logcb, "after _scan_remote_collective 1")
690         d.addCallback(lambda ign: self._turn_deque())
691         return d
692
693     def _process(self, item, now=None):
694         # Downloader
695         self._log("_process(%r)" % (item,))
696         if now is None:
697             now = time.time()
698         (relpath_u, file_node, metadata) = item
699         fp = self._get_filepath(relpath_u)
700         abspath_u = unicode_from_filepath(fp)
701         conflict_path_u = self._get_conflicted_filename(abspath_u)
702
703         d = defer.succeed(None)
704
705         def do_update_db(written_abspath_u):
706             filecap = file_node.get_uri()
707             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
708             last_downloaded_uri = filecap
709             last_downloaded_timestamp = now
710             written_pathinfo = get_pathinfo(written_abspath_u)
711
712             if not written_pathinfo.exists and not metadata.get('deleted', False):
713                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
714
715             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
716                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
717             self._count('objects_downloaded')
718         def failed(f):
719             self._log("download failed: %s" % (str(f),))
720             self._count('objects_failed')
721             return f
722
723         if os.path.isfile(conflict_path_u):
724             def fail(res):
725                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
726             d.addCallback(fail)
727         else:
728             is_conflict = False
729             db_entry = self._db.get_db_entry(relpath_u)
730             dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
731             dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
732             if db_entry:
733                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
734                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
735                         is_conflict = True
736                         self._count('objects_conflicted')
737                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
738                     is_conflict = True
739                     self._count('objects_conflicted')
740                 elif self._is_upload_pending(relpath_u):
741                     is_conflict = True
742                     self._count('objects_conflicted')
743
744             if relpath_u.endswith(u"/"):
745                 if metadata.get('deleted', False):
746                     self._log("rmdir(%r) ignored" % (abspath_u,))
747                 else:
748                     self._log("mkdir(%r)" % (abspath_u,))
749                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
750                     d.addCallback(lambda ign: abspath_u)
751             else:
752                 if metadata.get('deleted', False):
753                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
754                 else:
755                     d.addCallback(lambda ign: file_node.download_best_version())
756                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
757                                                                                is_conflict=is_conflict))
758
759         d.addCallbacks(do_update_db, failed)
760
761         def trap_conflicts(f):
762             f.trap(ConflictError)
763             return None
764         d.addErrback(trap_conflicts)
765         return d