]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
06cebf290259d49551b2c35d0bc7d8804a76aa63
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 def is_new_file(pathinfo, db_entry):
46     if db_entry is None:
47         return True
48
49     if not pathinfo.exists and db_entry.size is None:
50         return False
51
52     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
53             (db_entry.size, db_entry.ctime, db_entry.mtime))
54
55
56 class MagicFolder(service.MultiService):
57     name = 'magic-folder'
58
59     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
60                  pending_delay=1.0, clock=None):
61         precondition_abspath(local_path_u)
62
63         service.MultiService.__init__(self)
64
65         immediate = clock is not None
66         clock = clock or reactor
67         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
68         if db is None:
69             return Failure(Exception('ERROR: Unable to load magic folder db.'))
70
71         # for tests
72         self._client = client
73         self._db = db
74
75         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
76         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
77
78         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
79         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
80                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
81
82     def startService(self):
83         # TODO: why is this being called more than once?
84         if self.running:
85             return defer.succeed(None)
86         print "%r.startService" % (self,)
87         service.MultiService.startService(self)
88         return self.uploader.start_monitoring()
89
90     def ready(self):
91         """ready is used to signal us to start
92         processing the upload and download items...
93         """
94         d = self.uploader.start_scanning()
95         d2 = self.downloader.start_scanning()
96         d.addCallback(lambda ign: d2)
97         return d
98
99     def finish(self):
100         print "finish"
101         d = self.uploader.stop()
102         d2 = self.downloader.stop()
103         d.addCallback(lambda ign: d2)
104         return d
105
106     def remove_service(self):
107         return service.MultiService.disownServiceParent(self)
108
109
110 class QueueMixin(HookMixin):
111     def __init__(self, client, local_path_u, db, name, clock):
112         self._client = client
113         self._local_path_u = local_path_u
114         self._local_filepath = to_filepath(local_path_u)
115         self._db = db
116         self._name = name
117         self._clock = clock
118         self._hooks = {'processed': None, 'started': None}
119         self.started_d = self.set_hook('started')
120
121         if not self._local_filepath.exists():
122             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
123                                  "but there is no directory at that location."
124                                  % quote_local_unicode_path(self._local_path_u))
125         if not self._local_filepath.isdir():
126             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
127                                  "but the thing at that location is not a directory."
128                                  % quote_local_unicode_path(self._local_path_u))
129
130         self._deque = deque()
131         self._lazy_tail = defer.succeed(None)
132         self._stopped = False
133         self._turn_delay = 0
134
135     def _get_filepath(self, relpath_u):
136         self._log("_get_filepath(%r)" % (relpath_u,))
137         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
138
139     def _get_relpath(self, filepath):
140         self._log("_get_relpath(%r)" % (filepath,))
141         segments = unicode_segments_from(filepath, self._local_filepath)
142         self._log("segments = %r" % (segments,))
143         return u"/".join(segments)
144
145     def _count(self, counter_name, delta=1):
146         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
147         self._log("%s += %r" % (counter_name, delta))
148         self._client.stats_provider.count(ctr, delta)
149
150     def _logcb(self, res, msg):
151         self._log("%s: %r" % (msg, res))
152         return res
153
154     def _log(self, msg):
155         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
156         self._client.log(s)
157         print s
158         #open("events", "ab+").write(msg)
159
160     def _turn_deque(self):
161         self._log("_turn_deque")
162         if self._stopped:
163             self._log("stopped")
164             return
165         try:
166             item = self._deque.pop()
167             self._log("popped %r" % (item,))
168             self._count('objects_queued', -1)
169         except IndexError:
170             self._log("deque is now empty")
171             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
172         else:
173             self._lazy_tail.addCallback(lambda ign: self._process(item))
174             self._lazy_tail.addBoth(self._call_hook, 'processed')
175             self._lazy_tail.addErrback(log.err)
176             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
177
178
179 class Uploader(QueueMixin):
180     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
181                  immediate=False):
182         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
183
184         self.is_ready = False
185         self._immediate = immediate
186
187         if not IDirectoryNode.providedBy(upload_dirnode):
188             raise AssertionError("The URI in '%s' does not refer to a directory."
189                                  % os.path.join('private', 'magic_folder_dircap'))
190         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
191             raise AssertionError("The URI in '%s' is not a writecap to a directory."
192                                  % os.path.join('private', 'magic_folder_dircap'))
193
194         self._upload_dirnode = upload_dirnode
195         self._inotify = get_inotify_module()
196         self._notifier = self._inotify.INotify()
197         self._pending = set()
198
199         if hasattr(self._notifier, 'set_pending_delay'):
200             self._notifier.set_pending_delay(pending_delay)
201
202         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
203         #
204         self.mask = ( self._inotify.IN_CREATE
205                     | self._inotify.IN_CLOSE_WRITE
206                     | self._inotify.IN_MOVED_TO
207                     | self._inotify.IN_MOVED_FROM
208                     | self._inotify.IN_DELETE
209                     | self._inotify.IN_ONLYDIR
210                     | IN_EXCL_UNLINK
211                     )
212         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
213                              recursive=True)
214
215     def start_monitoring(self):
216         self._log("start_monitoring")
217         d = defer.succeed(None)
218         d.addCallback(lambda ign: self._notifier.startReading())
219         d.addCallback(lambda ign: self._count('dirs_monitored'))
220         d.addBoth(self._call_hook, 'started')
221         return d
222
223     def stop(self):
224         self._log("stop")
225         self._notifier.stopReading()
226         self._count('dirs_monitored', -1)
227         if hasattr(self._notifier, 'wait_until_stopped'):
228             d = self._notifier.wait_until_stopped()
229         else:
230             d = defer.succeed(None)
231         d.addCallback(lambda ign: self._lazy_tail)
232         return d
233
234     def start_scanning(self):
235         self._log("start_scanning")
236         self.is_ready = True
237         return self._full_scan()
238
239     def _full_scan(self):
240         print "FULL SCAN"
241         self._pending = self._db.get_all_relpaths()
242         self._log("all_files %r" % (self._pending))
243         d = self._scan(u"")
244         return d
245
246     def _scan(self, reldir_u):
247         self._log("scan %r" % (reldir_u,))
248         fp = self._get_filepath(reldir_u)
249         try:
250             children = listdir_filepath(fp)
251         except EnvironmentError:
252             raise Exception("WARNING: magic folder: permission denied on directory %s"
253                             % quote_filepath(fp))
254         except FilenameEncodingError:
255             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
256                             % quote_filepath(fp))
257
258         d = defer.succeed(None)
259         for child in children:
260             _assert(isinstance(child, unicode), child=child)
261             d.addCallback(lambda ign, child=child:
262                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
263             def _add_pending(relpath_u):
264                 if magicpath.should_ignore_file(relpath_u):
265                     return None
266
267                 self._pending.add(relpath_u)
268             d.addCallback(_add_pending)
269         def _add_pending(ign):
270             self._log("adding %r" % (self._pending))
271             self._deque.extend(self._pending)
272         d.addCallback(_add_pending)
273         return d
274
275     def is_pending(self, relpath_u):
276         return relpath_u in self._pending
277
278     def _notify(self, opaque, path, events_mask):
279         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
280         relpath_u = self._get_relpath(path)
281
282         # We filter out IN_CREATE events not associated with a directory.
283         # Acting on IN_CREATE for files could cause us to read and upload
284         # a possibly-incomplete file before the application has closed it.
285         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
286         # It isn't possible to avoid watching for IN_CREATE at all, because
287         # it is the only event notified for a directory creation.
288
289         if ((events_mask & self._inotify.IN_CREATE) != 0 and
290             (events_mask & self._inotify.IN_ISDIR) == 0):
291             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
292             return
293         if relpath_u in self._pending:
294             self._log("ignoring event for %r (already pending)" % (relpath_u,))
295             return
296         if magicpath.should_ignore_file(relpath_u):
297             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
298             return
299
300         self._log("appending %r to deque" % (relpath_u,))
301         self._deque.append(relpath_u)
302         self._pending.add(relpath_u)
303         self._count('objects_queued')
304         if self.is_ready:
305             if self._immediate:  # for tests
306                 self._turn_deque()
307             else:
308                 self._clock.callLater(0, self._turn_deque)
309
310     def _when_queue_is_empty(self):
311         return defer.succeed(None)
312
313     def _process(self, relpath_u):
314         # Uploader
315         self._log("_process(%r)" % (relpath_u,))
316         if relpath_u is None:
317             return
318         precondition(isinstance(relpath_u, unicode), relpath_u)
319         precondition(not relpath_u.endswith(u'/'), relpath_u)
320
321         d = defer.succeed(None)
322
323         def _maybe_upload(val, now=None):
324             if now is None:
325                 now = time.time()
326             fp = self._get_filepath(relpath_u)
327             pathinfo = get_pathinfo(unicode_from_filepath(fp))
328
329             self._log("about to remove %r from pending set %r" %
330                       (relpath_u, self._pending))
331             self._pending.remove(relpath_u)
332             encoded_path_u = magicpath.path2magic(relpath_u)
333
334             if not pathinfo.exists:
335                 # FIXME merge this with the 'isfile' case.
336                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
337                 self._count('objects_disappeared')
338
339                 db_entry = self._db.get_db_entry(relpath_u)
340                 if db_entry is None:
341                     return None
342
343                 last_downloaded_timestamp = now  # is this correct?
344
345                 if is_new_file(pathinfo, db_entry):
346                     new_version = db_entry.version + 1
347                 else:
348                     self._log("Not uploading %r" % (relpath_u,))
349                     self._count('objects_not_uploaded')
350                     return
351
352                 metadata = { 'version': new_version,
353                              'deleted': True,
354                              'last_downloaded_timestamp': last_downloaded_timestamp }
355                 if db_entry.last_downloaded_uri is not None:
356                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
357
358                 empty_uploadable = Data("", self._client.convergence)
359                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
360                                                    metadata=metadata, overwrite=True)
361
362                 def _add_db_entry(filenode):
363                     filecap = filenode.get_uri()
364                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
365                     self._db.did_upload_version(relpath_u, new_version, filecap,
366                                                 last_downloaded_uri, last_downloaded_timestamp,
367                                                 pathinfo)
368                     self._count('files_uploaded')
369                 d2.addCallback(_add_db_entry)
370                 return d2
371             elif pathinfo.islink:
372                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
373                 return None
374             elif pathinfo.isdir:
375                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
376                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
377
378                 uploadable = Data("", self._client.convergence)
379                 encoded_path_u += magicpath.path2magic(u"/")
380                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
381                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
382                 def _succeeded(ign):
383                     self._log("created subdirectory %r" % (relpath_u,))
384                     self._count('directories_created')
385                 def _failed(f):
386                     self._log("failed to create subdirectory %r" % (relpath_u,))
387                     return f
388                 upload_d.addCallbacks(_succeeded, _failed)
389                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
390                 return upload_d
391             elif pathinfo.isfile:
392                 db_entry = self._db.get_db_entry(relpath_u)
393
394                 last_downloaded_timestamp = now
395
396                 if db_entry is None:
397                     new_version = 0
398                 elif is_new_file(pathinfo, db_entry):
399                     new_version = db_entry.version + 1
400                 else:
401                     self._log("Not uploading %r" % (relpath_u,))
402                     self._count('objects_not_uploaded')
403                     return None
404
405                 metadata = { 'version': new_version,
406                              'last_downloaded_timestamp': last_downloaded_timestamp }
407                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
408                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
409
410                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
411                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
412                                                    metadata=metadata, overwrite=True)
413
414                 def _add_db_entry(filenode):
415                     filecap = filenode.get_uri()
416                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
417                     self._db.did_upload_version(relpath_u, new_version, filecap,
418                                                 last_downloaded_uri, last_downloaded_timestamp,
419                                                 pathinfo)
420                     self._count('files_uploaded')
421                 d2.addCallback(_add_db_entry)
422                 return d2
423             else:
424                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
425                 return None
426
427         d.addCallback(_maybe_upload)
428
429         def _succeeded(res):
430             self._count('objects_succeeded')
431             return res
432         def _failed(f):
433             self._count('objects_failed')
434             self._log("%s while processing %r" % (f, relpath_u))
435             return f
436         d.addCallbacks(_succeeded, _failed)
437         return d
438
439     def _get_metadata(self, encoded_path_u):
440         try:
441             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
442         except KeyError:
443             return Failure()
444         return d
445
446     def _get_filenode(self, encoded_path_u):
447         try:
448             d = self._upload_dirnode.get(encoded_path_u)
449         except KeyError:
450             return Failure()
451         return d
452
453
454 class WriteFileMixin(object):
455     FUDGE_SECONDS = 10.0
456
457     def _get_conflicted_filename(self, abspath_u):
458         return abspath_u + u".conflict"
459
460     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
461         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
462                   % (abspath_u, len(file_contents), is_conflict, now))
463
464         # 1. Write a temporary file, say .foo.tmp.
465         # 2. is_conflict determines whether this is an overwrite or a conflict.
466         # 3. Set the mtime of the replacement file to be T seconds before the
467         #    current local time.
468         # 4. Perform a file replacement with backup filename foo.backup,
469         #    replaced file foo, and replacement file .foo.tmp. If any step of
470         #    this operation fails, reclassify as a conflict and stop.
471         #
472         # Returns the path of the destination file.
473
474         precondition_abspath(abspath_u)
475         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
476         backup_path_u = abspath_u + u".backup"
477         if now is None:
478             now = time.time()
479
480         # ensure parent directory exists
481         head, tail = os.path.split(abspath_u)
482
483         old_mask = os.umask(self._umask)
484         try:
485             fileutil.make_dirs(head, (~ self._umask) & 0777)
486             fileutil.write(replacement_path_u, file_contents)
487         finally:
488             os.umask(old_mask)
489
490         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
491         if is_conflict:
492             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
493             return self._rename_conflicted_file(abspath_u, replacement_path_u)
494         else:
495             try:
496                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
497                 return abspath_u
498             except fileutil.ConflictError:
499                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
500
501     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
502         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
503
504         conflict_path_u = self._get_conflicted_filename(abspath_u)
505         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
506         if os.path.isfile(replacement_path_u):
507             print "%r exists" % (replacement_path_u,)
508         if os.path.isfile(conflict_path_u):
509             print "%r exists" % (conflict_path_u,)
510
511         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
512         return conflict_path_u
513
514     def _rename_deleted_file(self, abspath_u):
515         self._log('renaming deleted file to backup: %s' % (abspath_u,))
516         try:
517             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
518         except OSError:
519             self._log("Already gone: '%s'" % (abspath_u,))
520         return abspath_u
521
522
523 class Downloader(QueueMixin, WriteFileMixin):
524     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
525
526     def __init__(self, client, local_path_u, db, collective_dirnode,
527                  upload_readonly_dircap, clock, is_upload_pending, umask):
528         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
529
530         if not IDirectoryNode.providedBy(collective_dirnode):
531             raise AssertionError("The URI in '%s' does not refer to a directory."
532                                  % os.path.join('private', 'collective_dircap'))
533         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
534             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
535                                  % os.path.join('private', 'collective_dircap'))
536
537         self._collective_dirnode = collective_dirnode
538         self._upload_readonly_dircap = upload_readonly_dircap
539         self._is_upload_pending = is_upload_pending
540         self._umask = umask
541
542     def start_scanning(self):
543         self._log("start_scanning")
544         files = self._db.get_all_relpaths()
545         self._log("all files %s" % files)
546
547         d = self._scan_remote_collective(scan_self=True)
548         d.addBoth(self._logcb, "after _scan_remote_collective 0")
549         self._turn_deque()
550         return d
551
552     def stop(self):
553         self._stopped = True
554         d = defer.succeed(None)
555         d.addCallback(lambda ign: self._lazy_tail)
556         return d
557
558     def _should_download(self, relpath_u, remote_version):
559         """
560         _should_download returns a bool indicating whether or not a remote object should be downloaded.
561         We check the remote metadata version against our magic-folder db version number;
562         latest version wins.
563         """
564         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
565         if magicpath.should_ignore_file(relpath_u):
566             self._log("nope")
567             return False
568         self._log("yep")
569         db_entry = self._db.get_db_entry(relpath_u)
570         if db_entry is None:
571             return True
572         self._log("version %r" % (db_entry.version,))
573         return (db_entry.version < remote_version)
574
575     def _get_local_latest(self, relpath_u):
576         """
577         _get_local_latest takes a unicode path string checks to see if this file object
578         exists in our magic-folder db; if not then return None
579         else check for an entry in our magic-folder db and return the version number.
580         """
581         if not self._get_filepath(relpath_u).exists():
582             return None
583         db_entry = self._db.get_db_entry(relpath_u)
584         return None if db_entry is None else db_entry.version
585
586     def _get_collective_latest_file(self, filename):
587         """
588         _get_collective_latest_file takes a file path pointing to a file managed by
589         magic-folder and returns a deferred that fires with the two tuple containing a
590         file node and metadata for the latest version of the file located in the
591         magic-folder collective directory.
592         """
593         collective_dirmap_d = self._collective_dirnode.list()
594         def scan_collective(result):
595             list_of_deferreds = []
596             for dir_name in result.keys():
597                 # XXX make sure it's a directory
598                 d = defer.succeed(None)
599                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
600                 list_of_deferreds.append(d)
601             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
602             return deferList
603         collective_dirmap_d.addCallback(scan_collective)
604         def highest_version(deferredList):
605             max_version = 0
606             metadata = None
607             node = None
608             for success, result in deferredList:
609                 if success:
610                     if result[1]['version'] > max_version:
611                         node, metadata = result
612                         max_version = result[1]['version']
613             return node, metadata
614         collective_dirmap_d.addCallback(highest_version)
615         return collective_dirmap_d
616
617     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
618         self._log("_scan_remote_dmd nickname %r" % (nickname,))
619         d = dirnode.list()
620         def scan_listing(listing_map):
621             for encoded_relpath_u in listing_map.keys():
622                 relpath_u = magicpath.magic2path(encoded_relpath_u)
623                 self._log("found %r" % (relpath_u,))
624
625                 file_node, metadata = listing_map[encoded_relpath_u]
626                 local_version = self._get_local_latest(relpath_u)
627                 remote_version = metadata.get('version', None)
628                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
629
630                 if local_version is None or remote_version is None or local_version < remote_version:
631                     self._log("%r added to download queue" % (relpath_u,))
632                     if scan_batch.has_key(relpath_u):
633                         scan_batch[relpath_u] += [(file_node, metadata)]
634                     else:
635                         scan_batch[relpath_u] = [(file_node, metadata)]
636
637         d.addCallback(scan_listing)
638         d.addBoth(self._logcb, "end of _scan_remote_dmd")
639         return d
640
641     def _scan_remote_collective(self, scan_self=False):
642         self._log("_scan_remote_collective")
643         scan_batch = {}  # path -> [(filenode, metadata)]
644
645         d = self._collective_dirnode.list()
646         def scan_collective(dirmap):
647             d2 = defer.succeed(None)
648             for dir_name in dirmap:
649                 (dirnode, metadata) = dirmap[dir_name]
650                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
651                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
652                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
653                     def _err(f, dir_name=dir_name):
654                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
655                         # XXX what should we do to make this failure more visible to users?
656                     d2.addErrback(_err)
657
658             return d2
659         d.addCallback(scan_collective)
660
661         def _filter_batch_to_deque(ign):
662             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
663             for relpath_u in scan_batch.keys():
664                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
665
666                 if self._should_download(relpath_u, metadata['version']):
667                     self._deque.append( (relpath_u, file_node, metadata) )
668                 else:
669                     self._log("Excluding %r" % (relpath_u,))
670                     self._call_hook(None, 'processed')
671
672             self._log("deque after = %r" % (self._deque,))
673         d.addCallback(_filter_batch_to_deque)
674         return d
675
676     def _when_queue_is_empty(self):
677         d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
678         d.addBoth(self._logcb, "after _scan_remote_collective 1")
679         d.addCallback(lambda ign: self._turn_deque())
680         return d
681
682     def _process(self, item, now=None):
683         # Downloader
684         self._log("_process(%r)" % (item,))
685         if now is None:
686             now = time.time()
687         (relpath_u, file_node, metadata) = item
688         fp = self._get_filepath(relpath_u)
689         abspath_u = unicode_from_filepath(fp)
690         conflict_path_u = self._get_conflicted_filename(abspath_u)
691
692         d = defer.succeed(None)
693
694         def do_update_db(written_abspath_u):
695             filecap = file_node.get_uri()
696             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
697             last_downloaded_uri = filecap
698             last_downloaded_timestamp = now
699             written_pathinfo = get_pathinfo(written_abspath_u)
700
701             if not written_pathinfo.exists and not metadata.get('deleted', False):
702                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
703
704             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
705                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
706             self._count('objects_downloaded')
707         def failed(f):
708             self._log("download failed: %s" % (str(f),))
709             self._count('objects_failed')
710             return f
711
712         if os.path.isfile(conflict_path_u):
713             def fail(res):
714                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
715             d.addCallback(fail)
716         else:
717             is_conflict = False
718             db_entry = self._db.get_db_entry(relpath_u)
719             dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
720             dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
721             if db_entry:
722                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
723                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
724                         is_conflict = True
725                         self._count('objects_conflicted')
726                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
727                     is_conflict = True
728                     self._count('objects_conflicted')
729                 elif self._is_upload_pending(relpath_u):
730                     is_conflict = True
731                     self._count('objects_conflicted')
732
733             if relpath_u.endswith(u"/"):
734                 if metadata.get('deleted', False):
735                     self._log("rmdir(%r) ignored" % (abspath_u,))
736                 else:
737                     self._log("mkdir(%r)" % (abspath_u,))
738                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
739                     d.addCallback(lambda ign: abspath_u)
740             else:
741                 if metadata.get('deleted', False):
742                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
743                 else:
744                     d.addCallback(lambda ign: file_node.download_best_version())
745                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
746                                                                                is_conflict=is_conflict))
747
748         d.addCallbacks(do_update_db, failed)
749
750         def trap_conflicts(f):
751             f.trap(ConflictError)
752             return None
753         d.addErrback(trap_conflicts)
754         return d