]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
df26ebb8fffd795ca3a7b1ea2707ec17bb3d3670
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 class MagicFolder(service.MultiService):
46     name = 'magic-folder'
47
48     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49                  pending_delay=1.0, clock=None):
50         precondition_abspath(local_path_u)
51
52         service.MultiService.__init__(self)
53
54         immediate = clock is not None
55         clock = clock or reactor
56         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
57         if db is None:
58             return Failure(Exception('ERROR: Unable to load magic folder db.'))
59
60         # for tests
61         self._client = client
62         self._db = db
63
64         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
65         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
66
67         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
68         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
69                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending)
70
71     def startService(self):
72         # TODO: why is this being called more than once?
73         if self.running:
74             return defer.succeed(None)
75         print "%r.startService" % (self,)
76         service.MultiService.startService(self)
77         return self.uploader.start_monitoring()
78
79     def ready(self):
80         """ready is used to signal us to start
81         processing the upload and download items...
82         """
83         d = self.uploader.start_scanning()
84         d2 = self.downloader.start_scanning()
85         d.addCallback(lambda ign: d2)
86         return d
87
88     def finish(self):
89         print "finish"
90         d = self.uploader.stop()
91         d2 = self.downloader.stop()
92         d.addCallback(lambda ign: d2)
93         return d
94
95     def remove_service(self):
96         return service.MultiService.disownServiceParent(self)
97
98
99 class QueueMixin(HookMixin):
100     def __init__(self, client, local_path_u, db, name, clock):
101         self._client = client
102         self._local_path_u = local_path_u
103         self._local_filepath = to_filepath(local_path_u)
104         self._db = db
105         self._name = name
106         self._clock = clock
107         self._hooks = {'processed': None, 'started': None}
108         self.started_d = self.set_hook('started')
109
110         if not self._local_filepath.exists():
111             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
112                                  "but there is no directory at that location."
113                                  % quote_local_unicode_path(self._local_path_u))
114         if not self._local_filepath.isdir():
115             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
116                                  "but the thing at that location is not a directory."
117                                  % quote_local_unicode_path(self._local_path_u))
118
119         self._deque = deque()
120         self._lazy_tail = defer.succeed(None)
121         self._stopped = False
122         self._turn_delay = 0
123
124     def _get_filepath(self, relpath_u):
125         self._log("_get_filepath(%r)" % (relpath_u,))
126         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
127
128     def _get_relpath(self, filepath):
129         self._log("_get_relpath(%r)" % (filepath,))
130         segments = unicode_segments_from(filepath, self._local_filepath)
131         self._log("segments = %r" % (segments,))
132         return u"/".join(segments)
133
134     def _count(self, counter_name, delta=1):
135         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
136         self._log("%s += %r" % (counter_name, delta))
137         self._client.stats_provider.count(ctr, delta)
138
139     def _logcb(self, res, msg):
140         self._log("%s: %r" % (msg, res))
141         return res
142
143     def _log(self, msg):
144         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
145         self._client.log(s)
146         print s
147         #open("events", "ab+").write(msg)
148
149     def _turn_deque(self):
150         self._log("_turn_deque")
151         if self._stopped:
152             self._log("stopped")
153             return
154         try:
155             item = self._deque.pop()
156             self._log("popped %r" % (item,))
157             self._count('objects_queued', -1)
158         except IndexError:
159             self._log("deque is now empty")
160             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
161         else:
162             self._lazy_tail.addCallback(lambda ign: self._process(item))
163             self._lazy_tail.addBoth(self._call_hook, 'processed')
164             self._lazy_tail.addErrback(log.err)
165             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
166
167
168 class Uploader(QueueMixin):
169     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
170                  immediate=False):
171         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
172
173         self.is_ready = False
174         self._immediate = immediate
175
176         if not IDirectoryNode.providedBy(upload_dirnode):
177             raise AssertionError("The URI in '%s' does not refer to a directory."
178                                  % os.path.join('private', 'magic_folder_dircap'))
179         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
180             raise AssertionError("The URI in '%s' is not a writecap to a directory."
181                                  % os.path.join('private', 'magic_folder_dircap'))
182
183         self._upload_dirnode = upload_dirnode
184         self._inotify = get_inotify_module()
185         self._notifier = self._inotify.INotify()
186         self._pending = set()
187
188         if hasattr(self._notifier, 'set_pending_delay'):
189             self._notifier.set_pending_delay(pending_delay)
190
191         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
192         #
193         self.mask = ( self._inotify.IN_CREATE
194                     | self._inotify.IN_CLOSE_WRITE
195                     | self._inotify.IN_MOVED_TO
196                     | self._inotify.IN_MOVED_FROM
197                     | self._inotify.IN_DELETE
198                     | self._inotify.IN_ONLYDIR
199                     | IN_EXCL_UNLINK
200                     )
201         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
202                              recursive=True)
203
204     def start_monitoring(self):
205         self._log("start_monitoring")
206         d = defer.succeed(None)
207         d.addCallback(lambda ign: self._notifier.startReading())
208         d.addCallback(lambda ign: self._count('dirs_monitored'))
209         d.addBoth(self._call_hook, 'started')
210         return d
211
212     def stop(self):
213         self._log("stop")
214         self._notifier.stopReading()
215         self._count('dirs_monitored', -1)
216         if hasattr(self._notifier, 'wait_until_stopped'):
217             d = self._notifier.wait_until_stopped()
218         else:
219             d = defer.succeed(None)
220         d.addCallback(lambda ign: self._lazy_tail)
221         return d
222
223     def start_scanning(self):
224         self._log("start_scanning")
225         self.is_ready = True
226         self._pending = self._db.get_all_relpaths()
227         self._log("all_files %r" % (self._pending))
228         d = self._scan(u"")
229         def _add_pending(ign):
230             # This adds all of the files that were in the db but not already processed
231             # (normally because they have been deleted on disk).
232             self._log("adding %r" % (self._pending))
233             self._deque.extend(self._pending)
234         d.addCallback(_add_pending)
235         d.addCallback(lambda ign: self._turn_deque())
236         return d
237
238     def _scan(self, reldir_u):
239         self._log("scan %r" % (reldir_u,))
240         fp = self._get_filepath(reldir_u)
241         try:
242             children = listdir_filepath(fp)
243         except EnvironmentError:
244             raise Exception("WARNING: magic folder: permission denied on directory %s"
245                             % quote_filepath(fp))
246         except FilenameEncodingError:
247             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
248                             % quote_filepath(fp))
249
250         d = defer.succeed(None)
251         for child in children:
252             _assert(isinstance(child, unicode), child=child)
253             d.addCallback(lambda ign, child=child:
254                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
255             def _add_pending(relpath_u):
256                 if magicpath.should_ignore_file(relpath_u):
257                     return None
258
259                 self._pending.add(relpath_u)
260                 return relpath_u
261             d.addCallback(_add_pending)
262             # This call to _process doesn't go through the deque, and probably should.
263             d.addCallback(self._process)
264             d.addBoth(self._call_hook, 'processed')
265             d.addErrback(log.err)
266
267         return d
268
269     def is_pending(relpath_u):
270         return relpath_u in self._pending
271
272     def _notify(self, opaque, path, events_mask):
273         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
274         relpath_u = self._get_relpath(path)
275
276         # We filter out IN_CREATE events not associated with a directory.
277         # Acting on IN_CREATE for files could cause us to read and upload
278         # a possibly-incomplete file before the application has closed it.
279         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
280         # It isn't possible to avoid watching for IN_CREATE at all, because
281         # it is the only event notified for a directory creation.
282
283         if ((events_mask & self._inotify.IN_CREATE) != 0 and
284             (events_mask & self._inotify.IN_ISDIR) == 0):
285             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
286             return
287         if relpath_u in self._pending:
288             self._log("ignoring event for %r (already pending)" % (relpath_u,))
289             return
290         if magicpath.should_ignore_file(relpath_u):
291             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
292             return
293
294         self._log("appending %r to deque" % (relpath_u,))
295         self._deque.append(relpath_u)
296         self._pending.add(relpath_u)
297         self._count('objects_queued')
298         if self.is_ready:
299             if self._immediate:  # for tests
300                 self._turn_deque()
301             else:
302                 self._clock.callLater(0, self._turn_deque)
303
304     def _when_queue_is_empty(self):
305         return defer.succeed(None)
306
307     def _process(self, relpath_u):
308         # Uploader
309         self._log("_process(%r)" % (relpath_u,))
310         if relpath_u is None:
311             return
312         precondition(isinstance(relpath_u, unicode), relpath_u)
313         precondition(not relpath_u.endswith(u'/'), relpath_u)
314
315         d = defer.succeed(None)
316
317         def _maybe_upload(val, now=None):
318             if now is None:
319                 now = time.time()
320             fp = self._get_filepath(relpath_u)
321             pathinfo = get_pathinfo(unicode_from_filepath(fp))
322
323             self._log("about to remove %r from pending set %r" %
324                       (relpath_u, self._pending))
325             self._pending.remove(relpath_u)
326             encoded_path_u = magicpath.path2magic(relpath_u)
327
328             if not pathinfo.exists:
329                 # FIXME merge this with the 'isfile' case.
330                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
331                 self._count('objects_disappeared')
332                 if not self._db.check_file_db_exists(relpath_u):
333                     return None
334
335                 last_downloaded_timestamp = now
336                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
337
338                 current_version = self._db.get_local_file_version(relpath_u)
339                 if current_version is None:
340                     new_version = 0
341                 elif self._db.is_new_file(pathinfo, relpath_u):
342                     new_version = current_version + 1
343                 else:
344                     self._log("Not uploading %r" % (relpath_u,))
345                     self._count('objects_not_uploaded')
346                     return
347
348                 metadata = { 'version': new_version,
349                              'deleted': True,
350                              'last_downloaded_timestamp': last_downloaded_timestamp }
351                 if last_downloaded_uri is not None:
352                     metadata['last_downloaded_uri'] = last_downloaded_uri
353
354                 empty_uploadable = Data("", self._client.convergence)
355                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
356                                                    metadata=metadata, overwrite=True)
357
358                 def _add_db_entry(filenode):
359                     filecap = filenode.get_uri()
360                     self._db.did_upload_version(relpath_u, new_version, filecap,
361                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
362                     self._count('files_uploaded')
363                 d2.addCallback(_add_db_entry)
364                 return d2
365             elif pathinfo.islink:
366                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
367                 return None
368             elif pathinfo.isdir:
369                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
370                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
371
372                 uploadable = Data("", self._client.convergence)
373                 encoded_path_u += magicpath.path2magic(u"/")
374                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
375                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
376                 def _succeeded(ign):
377                     self._log("created subdirectory %r" % (relpath_u,))
378                     self._count('directories_created')
379                 def _failed(f):
380                     self._log("failed to create subdirectory %r" % (relpath_u,))
381                     return f
382                 upload_d.addCallbacks(_succeeded, _failed)
383                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
384                 return upload_d
385             elif pathinfo.isfile:
386                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
387                 last_downloaded_timestamp = now
388
389                 current_version = self._db.get_local_file_version(relpath_u)
390                 if current_version is None:
391                     new_version = 0
392                 elif self._db.is_new_file(pathinfo, relpath_u):
393                     new_version = current_version + 1
394                 else:
395                     self._log("Not uploading %r" % (relpath_u,))
396                     self._count('objects_not_uploaded')
397                     return None
398
399                 metadata = { 'version': new_version,
400                              'last_downloaded_timestamp': last_downloaded_timestamp }
401                 if last_downloaded_uri is not None:
402                     metadata['last_downloaded_uri'] = last_downloaded_uri
403
404                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
405                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
406                                                    metadata=metadata, overwrite=True)
407
408                 def _add_db_entry(filenode):
409                     filecap = filenode.get_uri()
410                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
411                     self._db.did_upload_version(relpath_u, new_version, filecap,
412                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
413                     self._count('files_uploaded')
414                 d2.addCallback(_add_db_entry)
415                 return d2
416             else:
417                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
418                 return None
419
420         d.addCallback(_maybe_upload)
421
422         def _succeeded(res):
423             self._count('objects_succeeded')
424             return res
425         def _failed(f):
426             self._count('objects_failed')
427             self._log("%s while processing %r" % (f, relpath_u))
428             return f
429         d.addCallbacks(_succeeded, _failed)
430         return d
431
432     def _get_metadata(self, encoded_path_u):
433         try:
434             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
435         except KeyError:
436             return Failure()
437         return d
438
439     def _get_filenode(self, encoded_path_u):
440         try:
441             d = self._upload_dirnode.get(encoded_path_u)
442         except KeyError:
443             return Failure()
444         return d
445
446
447 class WriteFileMixin(object):
448     FUDGE_SECONDS = 10.0
449
450     def _get_conflicted_filename(self, abspath_u):
451         return abspath_u + u".conflict"
452
453     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
454         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
455                   % (abspath_u, len(file_contents), is_conflict, now))
456
457         # 1. Write a temporary file, say .foo.tmp.
458         # 2. is_conflict determines whether this is an overwrite or a conflict.
459         # 3. Set the mtime of the replacement file to be T seconds before the
460         #    current local time.
461         # 4. Perform a file replacement with backup filename foo.backup,
462         #    replaced file foo, and replacement file .foo.tmp. If any step of
463         #    this operation fails, reclassify as a conflict and stop.
464         #
465         # Returns the path of the destination file.
466
467         precondition_abspath(abspath_u)
468         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
469         backup_path_u = abspath_u + u".backup"
470         if now is None:
471             now = time.time()
472
473         # ensure parent directory exists
474         head, tail = os.path.split(abspath_u)
475         mode = 0777 # XXX
476         fileutil.make_dirs(head, mode)
477
478         fileutil.write(replacement_path_u, file_contents)
479         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
480         if is_conflict:
481             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
482             return self._rename_conflicted_file(abspath_u, replacement_path_u)
483         else:
484             try:
485                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
486                 return abspath_u
487             except fileutil.ConflictError:
488                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
489
490     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
491         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
492
493         conflict_path_u = self._get_conflicted_filename(abspath_u)
494         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
495         if os.path.isfile(replacement_path_u):
496             print "%r exists" % (replacement_path_u,)
497         if os.path.isfile(conflict_path_u):
498             print "%r exists" % (conflict_path_u,)
499
500         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
501         return conflict_path_u
502
503     def _rename_deleted_file(self, abspath_u):
504         self._log('renaming deleted file to backup: %s' % (abspath_u,))
505         try:
506             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
507         except IOError:
508             # XXX is this the correct error?
509             self._log("Already gone: '%s'" % (abspath_u,))
510         return abspath_u
511
512
513 class Downloader(QueueMixin, WriteFileMixin):
514     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
515
516     def __init__(self, client, local_path_u, db, collective_dirnode,
517                  upload_readonly_dircap, clock, is_upload_pending):
518         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
519
520         if not IDirectoryNode.providedBy(collective_dirnode):
521             raise AssertionError("The URI in '%s' does not refer to a directory."
522                                  % os.path.join('private', 'collective_dircap'))
523         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
524             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
525                                  % os.path.join('private', 'collective_dircap'))
526
527         self._collective_dirnode = collective_dirnode
528         self._upload_readonly_dircap = upload_readonly_dircap
529         self._is_upload_pending = is_upload_pending
530
531         self._turn_delay = self.REMOTE_SCAN_INTERVAL
532
533     def start_scanning(self):
534         self._log("start_scanning")
535         files = self._db.get_all_relpaths()
536         self._log("all files %s" % files)
537
538         d = self._scan_remote_collective()
539         d.addBoth(self._logcb, "after _scan_remote_collective 0")
540         self._turn_deque()
541         return d
542
543     def stop(self):
544         self._stopped = True
545         d = defer.succeed(None)
546         d.addCallback(lambda ign: self._lazy_tail)
547         return d
548
549     def _should_download(self, relpath_u, remote_version):
550         """
551         _should_download returns a bool indicating whether or not a remote object should be downloaded.
552         We check the remote metadata version against our magic-folder db version number;
553         latest version wins.
554         """
555         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
556         if magicpath.should_ignore_file(relpath_u):
557             self._log("nope")
558             return False
559         self._log("yep")
560         v = self._db.get_local_file_version(relpath_u)
561         self._log("v = %r" % (v,))
562         return (v is None or v < remote_version)
563
564     def _get_local_latest(self, relpath_u):
565         """
566         _get_local_latest takes a unicode path string checks to see if this file object
567         exists in our magic-folder db; if not then return None
568         else check for an entry in our magic-folder db and return the version number.
569         """
570         if not self._get_filepath(relpath_u).exists():
571             return None
572         return self._db.get_local_file_version(relpath_u)
573
574     def _get_collective_latest_file(self, filename):
575         """
576         _get_collective_latest_file takes a file path pointing to a file managed by
577         magic-folder and returns a deferred that fires with the two tuple containing a
578         file node and metadata for the latest version of the file located in the
579         magic-folder collective directory.
580         """
581         collective_dirmap_d = self._collective_dirnode.list()
582         def scan_collective(result):
583             list_of_deferreds = []
584             for dir_name in result.keys():
585                 # XXX make sure it's a directory
586                 d = defer.succeed(None)
587                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
588                 list_of_deferreds.append(d)
589             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
590             return deferList
591         collective_dirmap_d.addCallback(scan_collective)
592         def highest_version(deferredList):
593             max_version = 0
594             metadata = None
595             node = None
596             for success, result in deferredList:
597                 if success:
598                     if result[1]['version'] > max_version:
599                         node, metadata = result
600                         max_version = result[1]['version']
601             return node, metadata
602         collective_dirmap_d.addCallback(highest_version)
603         return collective_dirmap_d
604
605     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
606         self._log("_scan_remote_dmd nickname %r" % (nickname,))
607         d = dirnode.list()
608         def scan_listing(listing_map):
609             for encoded_relpath_u in listing_map.keys():
610                 relpath_u = magicpath.magic2path(encoded_relpath_u)
611                 self._log("found %r" % (relpath_u,))
612
613                 file_node, metadata = listing_map[encoded_relpath_u]
614                 local_version = self._get_local_latest(relpath_u)
615                 remote_version = metadata.get('version', None)
616                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
617
618                 if local_version is None or remote_version is None or local_version < remote_version:
619                     self._log("%r added to download queue" % (relpath_u,))
620                     if scan_batch.has_key(relpath_u):
621                         scan_batch[relpath_u] += [(file_node, metadata)]
622                     else:
623                         scan_batch[relpath_u] = [(file_node, metadata)]
624
625         d.addCallback(scan_listing)
626         d.addBoth(self._logcb, "end of _scan_remote_dmd")
627         return d
628
629     def _scan_remote_collective(self):
630         self._log("_scan_remote_collective")
631         scan_batch = {}  # path -> [(filenode, metadata)]
632
633         d = self._collective_dirnode.list()
634         def scan_collective(dirmap):
635             d2 = defer.succeed(None)
636             for dir_name in dirmap:
637                 (dirnode, metadata) = dirmap[dir_name]
638                 if dirnode.get_readonly_uri() != self._upload_readonly_dircap:
639                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
640                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
641                     def _err(f, dir_name=dir_name):
642                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
643                         # XXX what should we do to make this failure more visible to users?
644                     d2.addErrback(_err)
645
646             return d2
647         d.addCallback(scan_collective)
648
649         def _filter_batch_to_deque(ign):
650             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
651             for relpath_u in scan_batch.keys():
652                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
653
654                 if self._should_download(relpath_u, metadata['version']):
655                     self._deque.append( (relpath_u, file_node, metadata) )
656                 else:
657                     self._log("Excluding %r" % (relpath_u,))
658                     self._count('objects_excluded')
659                     self._call_hook(None, 'processed')
660
661             self._log("deque after = %r" % (self._deque,))
662         d.addCallback(_filter_batch_to_deque)
663         return d
664
665     def _when_queue_is_empty(self):
666         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
667         d.addBoth(self._logcb, "after _scan_remote_collective 1")
668         d.addCallback(lambda ign: self._turn_deque())
669         return d
670
671     def _process(self, item, now=None):
672         # Downloader
673         self._log("_process(%r)" % (item,))
674         if now is None:
675             now = time.time()
676         (relpath_u, file_node, metadata) = item
677         fp = self._get_filepath(relpath_u)
678         abspath_u = unicode_from_filepath(fp)
679         conflict_path_u = self._get_conflicted_filename(abspath_u)
680
681         d = defer.succeed(None)
682
683         def do_update_db(written_abspath_u):
684             filecap = file_node.get_uri()
685             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
686             last_downloaded_uri = filecap
687             last_downloaded_timestamp = now
688             written_pathinfo = get_pathinfo(written_abspath_u)
689
690             if not written_pathinfo.exists and not metadata.get('deleted', False):
691                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
692
693             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
694                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
695             self._count('objects_downloaded')
696         def failed(f):
697             self._log("download failed: %s" % (str(f),))
698             self._count('objects_failed')
699             return f
700
701         if os.path.isfile(conflict_path_u):
702             def fail(res):
703                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
704             d.addCallback(fail)
705         else:
706             is_conflict = False
707             if self._db.check_file_db_exists(relpath_u):
708                 dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
709                 local_last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
710                 print "metadata %r" % (metadata,)
711                 print "<<<<--- if %r != %r" % (dmd_last_downloaded_uri, local_last_downloaded_uri)
712                 if dmd_last_downloaded_uri is not None and local_last_downloaded_uri is not None:
713                     if dmd_last_downloaded_uri != local_last_downloaded_uri:
714                         is_conflict = True
715                         self._count('objects_conflicted')
716                     else:
717                         dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
718                         local_last_uploaded_uri = self._db.get_last_uploaded_uri(relpath_u)
719                         print ">>>>  if %r != %r" % (dmd_last_uploaded_uri, local_last_uploaded_uri)
720                         if dmd_last_uploaded_uri != local_last_uploaded_uri:
721                             is_conflict = True
722                             self._count('objects_conflicted')
723                         else:
724                             # XXX todo: mark as conflict if file is in pending upload set
725                             if self._is_upload_pending(relpath_u):
726                                 is_conflict = True
727                                 self._count('objects_conflicted')
728
729             if relpath_u.endswith(u"/"):
730                 if metadata.get('deleted', False):
731                     self._log("rmdir(%r) ignored" % (abspath_u,))
732                 else:
733                     self._log("mkdir(%r)" % (abspath_u,))
734                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
735                     d.addCallback(lambda ign: abspath_u)
736             else:
737                 if metadata.get('deleted', False):
738                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
739                 else:
740                     d.addCallback(lambda ign: file_node.download_best_version())
741                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
742                                                                                is_conflict=is_conflict))
743
744         d.addCallbacks(do_update_db, failed)
745
746         def trap_conflicts(f):
747             f.trap(ConflictError)
748             return None
749         d.addErrback(trap_conflicts)
750         return d