]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
Simplify _scan_remote_* and remove Downloader._download_scan_batch attribute.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 class MagicFolder(service.MultiService):
46     name = 'magic-folder'
47
48     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49                  pending_delay=1.0, clock=reactor):
50         precondition_abspath(local_path_u)
51
52         service.MultiService.__init__(self)
53
54         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
55         if db is None:
56             return Failure(Exception('ERROR: Unable to load magic folder db.'))
57
58         # for tests
59         self._client = client
60         self._db = db
61
62         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
63         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
64
65         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock)
66         self.downloader = Downloader(client, local_path_u, db, collective_dirnode, upload_dirnode.get_readonly_uri(), clock)
67
68     def startService(self):
69         # TODO: why is this being called more than once?
70         if self.running:
71             return defer.succeed(None)
72         print "%r.startService" % (self,)
73         service.MultiService.startService(self)
74         return self.uploader.start_monitoring()
75
76     def ready(self):
77         """ready is used to signal us to start
78         processing the upload and download items...
79         """
80         d = self.uploader.start_scanning()
81         d2 = self.downloader.start_scanning()
82         d.addCallback(lambda ign: d2)
83         return d
84
85     def finish(self):
86         print "finish"
87         d = self.uploader.stop()
88         d2 = self.downloader.stop()
89         d.addCallback(lambda ign: d2)
90         return d
91
92     def remove_service(self):
93         return service.MultiService.disownServiceParent(self)
94
95
96 class QueueMixin(HookMixin):
97     def __init__(self, client, local_path_u, db, name, clock):
98         self._client = client
99         self._local_path_u = local_path_u
100         self._local_filepath = to_filepath(local_path_u)
101         self._db = db
102         self._name = name
103         self._clock = clock
104         self._hooks = {'processed': None, 'started': None}
105         self.started_d = self.set_hook('started')
106
107         if not self._local_filepath.exists():
108             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
109                                  "but there is no directory at that location."
110                                  % quote_local_unicode_path(self._local_path_u))
111         if not self._local_filepath.isdir():
112             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
113                                  "but the thing at that location is not a directory."
114                                  % quote_local_unicode_path(self._local_path_u))
115
116         self._deque = deque()
117         self._lazy_tail = defer.succeed(None)
118         self._stopped = False
119         self._turn_delay = 0
120
121     def _get_filepath(self, relpath_u):
122         self._log("_get_filepath(%r)" % (relpath_u,))
123         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
124
125     def _get_relpath(self, filepath):
126         self._log("_get_relpath(%r)" % (filepath,))
127         segments = unicode_segments_from(filepath, self._local_filepath)
128         self._log("segments = %r" % (segments,))
129         return u"/".join(segments)
130
131     def _count(self, counter_name, delta=1):
132         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
133         self._log("%s += %r" % (counter_name, delta))
134         self._client.stats_provider.count(ctr, delta)
135
136     def _logcb(self, res, msg):
137         self._log("%s: %r" % (msg, res))
138         return res
139
140     def _log(self, msg):
141         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
142         self._client.log(s)
143         print s
144         #open("events", "ab+").write(msg)
145
146     def _append_to_deque(self, relpath_u):
147         self._log("_append_to_deque(%r)" % (relpath_u,))
148         if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
149             return
150         self._deque.append(relpath_u)
151         self._pending.add(relpath_u)
152         self._count('objects_queued')
153         if self.is_ready:
154             self._clock.callLater(0, self._turn_deque)
155
156     def _turn_deque(self):
157         self._log("_turn_deque")
158         if self._stopped:
159             self._log("stopped")
160             return
161         try:
162             item = self._deque.pop()
163             self._log("popped %r" % (item,))
164             self._count('objects_queued', -1)
165         except IndexError:
166             self._log("deque is now empty")
167             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
168         else:
169             self._lazy_tail.addCallback(lambda ign: self._process(item))
170             self._lazy_tail.addBoth(self._call_hook, 'processed')
171             self._lazy_tail.addErrback(log.err)
172             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
173
174
175 class Uploader(QueueMixin):
176     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock):
177         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
178
179         self.is_ready = False
180
181         if not IDirectoryNode.providedBy(upload_dirnode):
182             raise AssertionError("The URI in '%s' does not refer to a directory."
183                                  % os.path.join('private', 'magic_folder_dircap'))
184         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
185             raise AssertionError("The URI in '%s' is not a writecap to a directory."
186                                  % os.path.join('private', 'magic_folder_dircap'))
187
188         self._upload_dirnode = upload_dirnode
189         self._inotify = get_inotify_module()
190         self._notifier = self._inotify.INotify()
191         self._pending = set()
192
193         if hasattr(self._notifier, 'set_pending_delay'):
194             self._notifier.set_pending_delay(pending_delay)
195
196         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
197         #
198         self.mask = ( self._inotify.IN_CREATE
199                     | self._inotify.IN_CLOSE_WRITE
200                     | self._inotify.IN_MOVED_TO
201                     | self._inotify.IN_MOVED_FROM
202                     | self._inotify.IN_DELETE
203                     | self._inotify.IN_ONLYDIR
204                     | IN_EXCL_UNLINK
205                     )
206         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
207                              recursive=True)
208
209     def start_monitoring(self):
210         self._log("start_monitoring")
211         d = defer.succeed(None)
212         d.addCallback(lambda ign: self._notifier.startReading())
213         d.addCallback(lambda ign: self._count('dirs_monitored'))
214         d.addBoth(self._call_hook, 'started')
215         return d
216
217     def stop(self):
218         self._log("stop")
219         self._notifier.stopReading()
220         self._count('dirs_monitored', -1)
221         if hasattr(self._notifier, 'wait_until_stopped'):
222             d = self._notifier.wait_until_stopped()
223         else:
224             d = defer.succeed(None)
225         d.addCallback(lambda ign: self._lazy_tail)
226         return d
227
228     def start_scanning(self):
229         self._log("start_scanning")
230         self.is_ready = True
231         self._pending = self._db.get_all_relpaths()
232         self._log("all_files %r" % (self._pending))
233         d = self._scan(u"")
234         def _add_pending(ign):
235             # This adds all of the files that were in the db but not already processed
236             # (normally because they have been deleted on disk).
237             self._log("adding %r" % (self._pending))
238             self._deque.extend(self._pending)
239         d.addCallback(_add_pending)
240         d.addCallback(lambda ign: self._turn_deque())
241         return d
242
243     def _scan(self, reldir_u):
244         self._log("scan %r" % (reldir_u,))
245         fp = self._get_filepath(reldir_u)
246         try:
247             children = listdir_filepath(fp)
248         except EnvironmentError:
249             raise Exception("WARNING: magic folder: permission denied on directory %s"
250                             % quote_filepath(fp))
251         except FilenameEncodingError:
252             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
253                             % quote_filepath(fp))
254
255         d = defer.succeed(None)
256         for child in children:
257             _assert(isinstance(child, unicode), child=child)
258             d.addCallback(lambda ign, child=child:
259                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
260             def _add_pending(relpath_u):
261                 if magicpath.should_ignore_file(relpath_u):
262                     return None
263
264                 self._pending.add(relpath_u)
265                 return relpath_u
266             d.addCallback(_add_pending)
267             # This call to _process doesn't go through the deque, and probably should.
268             d.addCallback(self._process)
269             d.addBoth(self._call_hook, 'processed')
270             d.addErrback(log.err)
271
272         return d
273
274     def _notify(self, opaque, path, events_mask):
275         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
276
277         # We filter out IN_CREATE events not associated with a directory.
278         # Acting on IN_CREATE for files could cause us to read and upload
279         # a possibly-incomplete file before the application has closed it.
280         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
281         # It isn't possible to avoid watching for IN_CREATE at all, because
282         # it is the only event notified for a directory creation.
283
284         if ((events_mask & self._inotify.IN_CREATE) != 0 and
285             (events_mask & self._inotify.IN_ISDIR) == 0):
286             self._log("ignoring inotify event for creation of file %r\n" % (path,))
287             return
288
289         relpath_u = self._get_relpath(path)
290         self._append_to_deque(relpath_u)
291
292     def _when_queue_is_empty(self):
293         return defer.succeed(None)
294
295     def _process(self, relpath_u):
296         # Uploader
297         self._log("_process(%r)" % (relpath_u,))
298         if relpath_u is None:
299             return
300         precondition(isinstance(relpath_u, unicode), relpath_u)
301         precondition(not relpath_u.endswith(u'/'), relpath_u)
302
303         d = defer.succeed(None)
304
305         def _maybe_upload(val, now=None):
306             if now is None:
307                 now = time.time()
308             fp = self._get_filepath(relpath_u)
309             pathinfo = get_pathinfo(unicode_from_filepath(fp))
310
311             self._log("about to remove %r from pending set %r" %
312                       (relpath_u, self._pending))
313             self._pending.remove(relpath_u)
314             encoded_path_u = magicpath.path2magic(relpath_u)
315
316             if not pathinfo.exists:
317                 # FIXME merge this with the 'isfile' case.
318                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
319                 self._count('objects_disappeared')
320                 if not self._db.check_file_db_exists(relpath_u):
321                     return None
322
323                 last_downloaded_timestamp = now
324                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
325
326                 current_version = self._db.get_local_file_version(relpath_u)
327                 if current_version is None:
328                     new_version = 0
329                 elif self._db.is_new_file(pathinfo, relpath_u):
330                     new_version = current_version + 1
331                 else:
332                     self._log("Not uploading %r" % (relpath_u,))
333                     self._count('objects_not_uploaded')
334                     return
335
336                 metadata = { 'version': new_version,
337                              'deleted': True,
338                              'last_downloaded_timestamp': last_downloaded_timestamp }
339                 if last_downloaded_uri is not None:
340                     metadata['last_downloaded_uri'] = last_downloaded_uri
341
342                 empty_uploadable = Data("", self._client.convergence)
343                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
344                                                    metadata=metadata, overwrite=True)
345
346                 def _add_db_entry(filenode):
347                     filecap = filenode.get_uri()
348                     self._db.did_upload_version(relpath_u, new_version, filecap,
349                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
350                     self._count('files_uploaded')
351                 d2.addCallback(_add_db_entry)
352                 return d2
353             elif pathinfo.islink:
354                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
355                 return None
356             elif pathinfo.isdir:
357                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
358                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
359
360                 uploadable = Data("", self._client.convergence)
361                 encoded_path_u += magicpath.path2magic(u"/")
362                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
363                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
364                 def _succeeded(ign):
365                     self._log("created subdirectory %r" % (relpath_u,))
366                     self._count('directories_created')
367                 def _failed(f):
368                     self._log("failed to create subdirectory %r" % (relpath_u,))
369                     return f
370                 upload_d.addCallbacks(_succeeded, _failed)
371                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
372                 return upload_d
373             elif pathinfo.isfile:
374                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
375                 last_downloaded_timestamp = now
376
377                 current_version = self._db.get_local_file_version(relpath_u)
378                 if current_version is None:
379                     new_version = 0
380                 elif self._db.is_new_file(pathinfo, relpath_u):
381                     new_version = current_version + 1
382                 else:
383                     self._log("Not uploading %r" % (relpath_u,))
384                     self._count('objects_not_uploaded')
385                     return None
386
387                 metadata = { 'version': new_version,
388                              'last_downloaded_timestamp': last_downloaded_timestamp }
389                 if last_downloaded_uri is not None:
390                     metadata['last_downloaded_uri'] = last_downloaded_uri
391
392                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
393                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
394                                                    metadata=metadata, overwrite=True)
395
396                 def _add_db_entry(filenode):
397                     filecap = filenode.get_uri()
398                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
399                     self._db.did_upload_version(relpath_u, new_version, filecap,
400                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
401                     self._count('files_uploaded')
402                 d2.addCallback(_add_db_entry)
403                 return d2
404             else:
405                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
406                 return None
407
408         d.addCallback(_maybe_upload)
409
410         def _succeeded(res):
411             self._count('objects_succeeded')
412             return res
413         def _failed(f):
414             self._count('objects_failed')
415             self._log("%s while processing %r" % (f, relpath_u))
416             return f
417         d.addCallbacks(_succeeded, _failed)
418         return d
419
420     def _get_metadata(self, encoded_path_u):
421         try:
422             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
423         except KeyError:
424             return Failure()
425         return d
426
427     def _get_filenode(self, encoded_path_u):
428         try:
429             d = self._upload_dirnode.get(encoded_path_u)
430         except KeyError:
431             return Failure()
432         return d
433
434
435 class WriteFileMixin(object):
436     FUDGE_SECONDS = 10.0
437
438     def _get_conflicted_filename(self, abspath_u):
439         return abspath_u + u".conflict"
440
441     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
442         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
443                   % (abspath_u, len(file_contents), is_conflict, now))
444
445         # 1. Write a temporary file, say .foo.tmp.
446         # 2. is_conflict determines whether this is an overwrite or a conflict.
447         # 3. Set the mtime of the replacement file to be T seconds before the
448         #    current local time.
449         # 4. Perform a file replacement with backup filename foo.backup,
450         #    replaced file foo, and replacement file .foo.tmp. If any step of
451         #    this operation fails, reclassify as a conflict and stop.
452         #
453         # Returns the path of the destination file.
454
455         precondition_abspath(abspath_u)
456         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
457         backup_path_u = abspath_u + u".backup"
458         if now is None:
459             now = time.time()
460
461         # ensure parent directory exists
462         head, tail = os.path.split(abspath_u)
463         mode = 0777 # XXX
464         fileutil.make_dirs(head, mode)
465
466         fileutil.write(replacement_path_u, file_contents)
467         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
468         if is_conflict:
469             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
470             return self._rename_conflicted_file(abspath_u, replacement_path_u)
471         else:
472             try:
473                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
474                 return abspath_u
475             except fileutil.ConflictError:
476                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
477
478     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
479         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
480
481         conflict_path_u = self._get_conflicted_filename(abspath_u)
482         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
483         if os.path.isfile(replacement_path_u):
484             print "%r exists" % (replacement_path_u,)
485         if os.path.isfile(conflict_path_u):
486             print "%r exists" % (conflict_path_u,)
487
488         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
489         return conflict_path_u
490
491     def _rename_deleted_file(self, abspath_u):
492         self._log('renaming deleted file to backup: %s' % (abspath_u,))
493         try:
494             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
495         except IOError:
496             # XXX is this the correct error?
497             self._log("Already gone: '%s'" % (abspath_u,))
498         return abspath_u
499
500
501 class Downloader(QueueMixin, WriteFileMixin):
502     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
503
504     def __init__(self, client, local_path_u, db, collective_dirnode, upload_readonly_dircap, clock):
505         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
506
507         if not IDirectoryNode.providedBy(collective_dirnode):
508             raise AssertionError("The URI in '%s' does not refer to a directory."
509                                  % os.path.join('private', 'collective_dircap'))
510         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
511             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
512                                  % os.path.join('private', 'collective_dircap'))
513
514         self._collective_dirnode = collective_dirnode
515         self._upload_readonly_dircap = upload_readonly_dircap
516
517         self._turn_delay = self.REMOTE_SCAN_INTERVAL
518
519     def start_scanning(self):
520         self._log("start_scanning")
521         files = self._db.get_all_relpaths()
522         self._log("all files %s" % files)
523
524         d = self._scan_remote_collective()
525         d.addBoth(self._logcb, "after _scan_remote_collective 0")
526         self._turn_deque()
527         return d
528
529     def stop(self):
530         self._stopped = True
531         d = defer.succeed(None)
532         d.addCallback(lambda ign: self._lazy_tail)
533         return d
534
535     def _should_download(self, relpath_u, remote_version):
536         """
537         _should_download returns a bool indicating whether or not a remote object should be downloaded.
538         We check the remote metadata version against our magic-folder db version number;
539         latest version wins.
540         """
541         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
542         if magicpath.should_ignore_file(relpath_u):
543             self._log("nope")
544             return False
545         self._log("yep")
546         v = self._db.get_local_file_version(relpath_u)
547         self._log("v = %r" % (v,))
548         return (v is None or v < remote_version)
549
550     def _get_local_latest(self, relpath_u):
551         """
552         _get_local_latest takes a unicode path string checks to see if this file object
553         exists in our magic-folder db; if not then return None
554         else check for an entry in our magic-folder db and return the version number.
555         """
556         if not self._get_filepath(relpath_u).exists():
557             return None
558         return self._db.get_local_file_version(relpath_u)
559
560     def _get_collective_latest_file(self, filename):
561         """
562         _get_collective_latest_file takes a file path pointing to a file managed by
563         magic-folder and returns a deferred that fires with the two tuple containing a
564         file node and metadata for the latest version of the file located in the
565         magic-folder collective directory.
566         """
567         collective_dirmap_d = self._collective_dirnode.list()
568         def scan_collective(result):
569             list_of_deferreds = []
570             for dir_name in result.keys():
571                 # XXX make sure it's a directory
572                 d = defer.succeed(None)
573                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
574                 list_of_deferreds.append(d)
575             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
576             return deferList
577         collective_dirmap_d.addCallback(scan_collective)
578         def highest_version(deferredList):
579             max_version = 0
580             metadata = None
581             node = None
582             for success, result in deferredList:
583                 if success:
584                     if result[1]['version'] > max_version:
585                         node, metadata = result
586                         max_version = result[1]['version']
587             return node, metadata
588         collective_dirmap_d.addCallback(highest_version)
589         return collective_dirmap_d
590
591     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
592         self._log("_scan_remote_dmd nickname %r" % (nickname,))
593         d = dirnode.list()
594         def scan_listing(listing_map):
595             for encoded_relpath_u in listing_map.keys():
596                 relpath_u = magicpath.magic2path(encoded_relpath_u)
597                 self._log("found %r" % (relpath_u,))
598
599                 file_node, metadata = listing_map[encoded_relpath_u]
600                 local_version = self._get_local_latest(relpath_u)
601                 remote_version = metadata.get('version', None)
602                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
603
604                 if local_version is None or remote_version is None or local_version < remote_version:
605                     self._log("%r added to download queue" % (relpath_u,))
606                     if scan_batch.has_key(relpath_u):
607                         scan_batch[relpath_u] += [(file_node, metadata)]
608                     else:
609                         scan_batch[relpath_u] = [(file_node, metadata)]
610
611         d.addCallback(scan_listing)
612         d.addBoth(self._logcb, "end of _scan_remote_dmd")
613         return d
614
615     def _scan_remote_collective(self):
616         self._log("_scan_remote_collective")
617         scan_batch = {}  # path -> [(filenode, metadata)]
618
619         d = self._collective_dirnode.list()
620         def scan_collective(dirmap):
621             d2 = defer.succeed(None)
622             for dir_name in dirmap:
623                 (dirnode, metadata) = dirmap[dir_name]
624                 if dirnode.get_readonly_uri() != self._upload_readonly_dircap:
625                     d2.addCallback(lambda ign, dir_name=dir_name:
626                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
627                     def _err(f):
628                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
629                         # XXX what should we do to make this failure more visible to users?
630                     d2.addErrback(_err)
631
632             return d2
633         d.addCallback(scan_collective)
634
635         def _filter_batch_to_deque(ign):
636             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
637             for relpath_u in scan_batch.keys():
638                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
639
640                 if self._should_download(relpath_u, metadata['version']):
641                     self._deque.append( (relpath_u, file_node, metadata) )
642                 else:
643                     self._log("Excluding %r" % (relpath_u,))
644                     self._count('objects_excluded')
645                     self._call_hook(None, 'processed')
646
647             self._log("deque after = %r" % (self._deque,))
648         d.addCallback(_filter_batch_to_deque)
649         return d
650
651     def _when_queue_is_empty(self):
652         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
653         d.addBoth(self._logcb, "after _scan_remote_collective 1")
654         d.addCallback(lambda ign: self._turn_deque())
655         return d
656
657     def _process(self, item, now=None):
658         # Downloader
659         self._log("_process(%r)" % (item,))
660         if now is None:
661             now = time.time()
662         (relpath_u, file_node, metadata) = item
663         fp = self._get_filepath(relpath_u)
664         abspath_u = unicode_from_filepath(fp)
665         conflict_path_u = self._get_conflicted_filename(abspath_u)
666
667         d = defer.succeed(None)
668
669         def do_update_db(written_abspath_u):
670             filecap = file_node.get_uri()
671             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
672             last_downloaded_uri = filecap
673             last_downloaded_timestamp = now
674             written_pathinfo = get_pathinfo(written_abspath_u)
675
676             if not written_pathinfo.exists and not metadata.get('deleted', False):
677                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
678
679             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
680                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
681             self._count('objects_downloaded')
682         def failed(f):
683             self._log("download failed: %s" % (str(f),))
684             self._count('objects_failed')
685             return f
686
687         if os.path.isfile(conflict_path_u):
688             def fail(res):
689                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
690             d.addCallback(fail)
691         else:
692             is_conflict = False
693             if self._db.check_file_db_exists(relpath_u):
694                 dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
695                 local_last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
696                 print "metadata %r" % (metadata,)
697                 print "<<<<--- if %r != %r" % (dmd_last_downloaded_uri, local_last_downloaded_uri)
698                 if dmd_last_downloaded_uri is not None and local_last_downloaded_uri is not None:
699                     if dmd_last_downloaded_uri != local_last_downloaded_uri:
700                         is_conflict = True
701                         self._count('objects_conflicted')
702
703                 #dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
704                 #local_last_uploaded_uri = ...
705
706             if relpath_u.endswith(u"/"):
707                 if metadata.get('deleted', False):
708                     self._log("rmdir(%r) ignored" % (abspath_u,))
709                 else:
710                     self._log("mkdir(%r)" % (abspath_u,))
711                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
712                     d.addCallback(lambda ign: abspath_u)
713             else:
714                 if metadata.get('deleted', False):
715                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
716                 else:
717                     d.addCallback(lambda ign: file_node.download_best_version())
718                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
719                                                                                is_conflict=is_conflict))
720
721         d.addCallbacks(do_update_db, failed)
722
723         def trap_conflicts(f):
724             f.trap(ConflictError)
725             return None
726         d.addErrback(trap_conflicts)
727         return d