]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
add the 'spurious' notifies
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 import shutil
5 from collections import deque
6 import time
7
8 from twisted.internet import defer, reactor, task
9 from twisted.python.failure import Failure
10 from twisted.python import runtime
11 from twisted.application import service
12
13 from allmydata.util import fileutil
14 from allmydata.interfaces import IDirectoryNode
15 from allmydata.util import log
16 from allmydata.util.fileutil import precondition_abspath, get_pathinfo
17 from allmydata.util.assertutil import precondition
18 from allmydata.util.deferredutil import HookMixin
19 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
20      extend_filepath, unicode_from_filepath, unicode_segments_from, \
21      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
22 from allmydata.immutable.upload import FileName, Data
23 from allmydata import magicfolderdb, magicpath
24
25
26 IN_EXCL_UNLINK = 0x04000000L
27
28 def get_inotify_module():
29     try:
30         if sys.platform == "win32":
31             from allmydata.windows import inotify
32         elif runtime.platform.supportsINotify():
33             from twisted.internet import inotify
34         else:
35             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
36                                       "This currently requires Linux or Windows.")
37         return inotify
38     except (ImportError, AttributeError) as e:
39         log.msg(e)
40         if sys.platform == "win32":
41             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
42                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
43         raise
44
45
46 class MagicFolder(service.MultiService):
47     name = 'magic-folder'
48
49     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
50                  pending_delay=1.0, clock=reactor):
51         precondition_abspath(local_path_u)
52
53         service.MultiService.__init__(self)
54
55         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
56         if db is None:
57             return Failure(Exception('ERROR: Unable to load magic folder db.'))
58
59         # for tests
60         self._client = client
61         self._db = db
62
63         self.is_ready = False
64
65         self.uploader = Uploader(client, local_path_u, db, upload_dircap, pending_delay, clock)
66         self.downloader = Downloader(client, local_path_u, db, collective_dircap, clock)
67
68     def startService(self):
69         # TODO: why is this being called more than once?
70         if self.running:
71             return defer.succeed(None)
72         print "%r.startService" % (self,)
73         service.MultiService.startService(self)
74         return self.uploader.start_monitoring()
75
76     def ready(self):
77         """ready is used to signal us to start
78         processing the upload and download items...
79         """
80         self.is_ready = True
81         d = self.uploader.start_scanning()
82         d2 = self.downloader.start_scanning()
83         d.addCallback(lambda ign: d2)
84         return d
85
86     def finish(self):
87         print "finish"
88         d = self.uploader.stop()
89         d2 = self.downloader.stop()
90         d.addCallback(lambda ign: d2)
91         return d
92
93     def remove_service(self):
94         return service.MultiService.disownServiceParent(self)
95
96
97 class QueueMixin(HookMixin):
98     def __init__(self, client, local_path_u, db, name, clock):
99         self._client = client
100         self._local_path_u = local_path_u
101         self._local_filepath = to_filepath(local_path_u)
102         self._db = db
103         self._name = name
104         self._clock = clock
105         self._hooks = {'processed': None, 'started': None}
106         self.started_d = self.set_hook('started')
107
108         if not self._local_filepath.exists():
109             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
110                                  "but there is no directory at that location."
111                                  % quote_local_unicode_path(self._local_path_u))
112         if not self._local_filepath.isdir():
113             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
114                                  "but the thing at that location is not a directory."
115                                  % quote_local_unicode_path(self._local_path_u))
116
117         self._deque = deque()
118         self._lazy_tail = defer.succeed(None)
119         self._pending = set()
120         self._stopped = False
121         self._turn_delay = 0
122
123     def _get_filepath(self, relpath_u):
124         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
125
126     def _get_relpath(self, filepath):
127         self._log("_get_relpath(%r)" % (filepath,))
128         segments = unicode_segments_from(filepath, self._local_filepath)
129         self._log("segments = %r" % (segments,))
130         return u"/".join(segments)
131
132     def _count(self, counter_name, delta=1):
133         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
134         self._log("%s += %r" % (counter_name, delta))
135         self._client.stats_provider.count(ctr, delta)
136
137     def _logcb(self, res, msg):
138         self._log("%s: %r" % (msg, res))
139         return res
140
141     def _log(self, msg):
142         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
143         self._client.log(s)
144         print s
145         #open("events", "ab+").write(msg)
146
147     def _append_to_deque(self, relpath_u):
148         self._log("_append_to_deque(%r)" % (relpath_u,))
149         if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
150             return
151         self._deque.append(relpath_u)
152         self._pending.add(relpath_u)
153         self._count('objects_queued')
154         if self.is_ready:
155             self._clock.callLater(0, self._turn_deque)
156
157     def _turn_deque(self):
158         self._log("_turn_deque")
159         if self._stopped:
160             self._log("stopped")
161             return
162         try:
163             item = self._deque.pop()
164             self._log("popped %r" % (item,))
165             self._count('objects_queued', -1)
166         except IndexError:
167             self._log("deque is now empty")
168             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
169         else:
170             self._lazy_tail.addCallback(lambda ign: self._process(item))
171             self._lazy_tail.addBoth(self._call_hook, 'processed')
172             self._lazy_tail.addErrback(log.err)
173             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
174
175
176 class Uploader(QueueMixin):
177     def __init__(self, client, local_path_u, db, upload_dircap, pending_delay, clock):
178         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
179
180         self.is_ready = False
181
182         # TODO: allow a path rather than a cap URI.
183         self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
184         if not IDirectoryNode.providedBy(self._upload_dirnode):
185             raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
186         if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
187             raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
188
189         self._inotify = get_inotify_module()
190         self._notifier = self._inotify.INotify()
191
192         if hasattr(self._notifier, 'set_pending_delay'):
193             self._notifier.set_pending_delay(pending_delay)
194
195         # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
196         #
197         self.mask = ( self._inotify.IN_CREATE
198                     | self._inotify.IN_CLOSE_WRITE
199                     | self._inotify.IN_MOVED_TO
200                     | self._inotify.IN_MOVED_FROM
201                     | self._inotify.IN_DELETE
202                     | self._inotify.IN_ONLYDIR
203                     | IN_EXCL_UNLINK
204                     )
205         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
206                              recursive=True)
207
208     def start_monitoring(self):
209         self._log("start_monitoring")
210         d = defer.succeed(None)
211         d.addCallback(lambda ign: self._notifier.startReading())
212         d.addCallback(lambda ign: self._count('dirs_monitored'))
213         d.addBoth(self._call_hook, 'started')
214         return d
215
216     def stop(self):
217         self._log("stop")
218         self._notifier.stopReading()
219         self._count('dirs_monitored', -1)
220         if hasattr(self._notifier, 'wait_until_stopped'):
221             d = self._notifier.wait_until_stopped()
222         else:
223             d = defer.succeed(None)
224         d.addCallback(lambda ign: self._lazy_tail)
225         return d
226
227     def start_scanning(self):
228         self._log("start_scanning")
229         self.is_ready = True
230         self._pending = self._db.get_all_relpaths()
231         self._log("all_files %r" % (self._pending))
232         d = self._scan(u"")
233         def _add_pending(ign):
234             # This adds all of the files that were in the db but not already processed
235             # (normally because they have been deleted on disk).
236             self._log("adding %r" % (self._pending))
237             self._deque.extend(self._pending)
238         d.addCallback(_add_pending)
239         d.addCallback(lambda ign: self._turn_deque())
240         return d
241
242     def _scan(self, reldir_u):
243         self._log("scan %r" % (reldir_u,))
244         fp = self._get_filepath(reldir_u)
245         try:
246             children = listdir_filepath(fp)
247         except EnvironmentError:
248             raise Exception("WARNING: magic folder: permission denied on directory %s"
249                             % quote_filepath(fp))
250         except FilenameEncodingError:
251             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
252                             % quote_filepath(fp))
253
254         d = defer.succeed(None)
255         for child in children:
256             assert isinstance(child, unicode), child
257             d.addCallback(lambda ign, child=child:
258                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
259             def _add_pending(relpath_u):
260                 if magicpath.should_ignore_file(relpath_u):
261                     return None
262
263                 self._pending.add(relpath_u)
264                 return relpath_u
265             d.addCallback(_add_pending)
266             # This call to _process doesn't go through the deque, and probably should.
267             d.addCallback(self._process)
268             d.addBoth(self._call_hook, 'processed')
269             d.addErrback(log.err)
270
271         return d
272
273     def _notify(self, opaque, path, events_mask):
274         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
275
276         # We filter out IN_CREATE events not associated with a directory.
277         # Acting on IN_CREATE for files could cause us to read and upload
278         # a possibly-incomplete file before the application has closed it.
279         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
280         # It isn't possible to avoid watching for IN_CREATE at all, because
281         # it is the only event notified for a directory creation.
282
283         if ((events_mask & self._inotify.IN_CREATE) != 0 and
284             (events_mask & self._inotify.IN_ISDIR) == 0):
285             self._log("ignoring inotify event for creation of file %r\n" % (path,))
286             return
287
288         relpath_u = self._get_relpath(path)
289         self._append_to_deque(relpath_u)
290
291     def _when_queue_is_empty(self):
292         return defer.succeed(None)
293
294     def _process(self, relpath_u):
295         self._log("_process(%r)" % (relpath_u,))
296         if relpath_u is None:
297             return
298         precondition(isinstance(relpath_u, unicode), relpath_u)
299
300         d = defer.succeed(None)
301
302         def _maybe_upload(val, now=None):
303             if now is None:
304                 now = time.time()
305             fp = self._get_filepath(relpath_u)
306             pathinfo = get_pathinfo(unicode_from_filepath(fp))
307
308             self._log("pending = %r, about to remove %r" % (self._pending, relpath_u))
309             self._pending.remove(relpath_u)
310             encoded_path_u = magicpath.path2magic(relpath_u)
311
312             if not pathinfo.exists:
313                 # FIXME merge this with the 'isfile' case.
314                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
315                 self._count('objects_disappeared')
316                 if not self._db.check_file_db_exists(relpath_u):
317                     return None
318
319                 last_downloaded_timestamp = now
320                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
321
322                 current_version = self._db.get_local_file_version(relpath_u)
323                 if current_version is None:
324                     new_version = 0
325                 elif self._db.is_new_file(pathinfo, relpath_u):
326                     new_version = current_version + 1
327                 else:
328                     self._log("ignoring {}".format(relpath_u))
329                     return
330
331                 metadata = { 'version': new_version,
332                              'deleted': True,
333                              'last_downloaded_timestamp': last_downloaded_timestamp }
334                 if last_downloaded_uri is not None:
335                     metadata['last_downloaded_uri'] = last_downloaded_uri
336
337                 empty_uploadable = Data("", self._client.convergence)
338                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
339                                                    metadata=metadata, overwrite=True)
340
341                 def _add_db_entry(filenode):
342                     filecap = filenode.get_uri()
343                     self._db.did_upload_version(relpath_u, new_version, filecap,
344                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
345                     self._count('files_uploaded')
346                 d2.addCallback(_add_db_entry)
347                 return d2
348             elif pathinfo.islink:
349                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
350                 return None
351             elif pathinfo.isdir:
352                 self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
353                 uploadable = Data("", self._client.convergence)
354                 encoded_path_u += magicpath.path2magic(u"/")
355                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
356                 def _succeeded(ign):
357                     self._log("created subdirectory %r" % (relpath_u,))
358                     self._count('directories_created')
359                 def _failed(f):
360                     self._log("failed to create subdirectory %r" % (relpath_u,))
361                     return f
362                 upload_d.addCallbacks(_succeeded, _failed)
363                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
364                 return upload_d
365             elif pathinfo.isfile:
366                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
367                 last_downloaded_timestamp = now
368
369                 current_version = self._db.get_local_file_version(relpath_u)
370                 if current_version is None:
371                     new_version = 0
372                 elif self._db.is_new_file(pathinfo, relpath_u):
373                     new_version = current_version + 1
374                 else:
375                     return None
376
377                 metadata = { 'version': new_version,
378                              'last_downloaded_timestamp': last_downloaded_timestamp }
379                 if last_downloaded_uri is not None:
380                     metadata['last_downloaded_uri'] = last_downloaded_uri
381
382                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
383                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
384                                                    metadata=metadata, overwrite=True)
385
386                 def _add_db_entry(filenode):
387                     filecap = filenode.get_uri()
388                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
389                     self._db.did_upload_version(relpath_u, new_version, filecap,
390                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
391                     self._count('files_uploaded')
392                 d2.addCallback(_add_db_entry)
393                 return d2
394             else:
395                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
396                 return None
397
398         d.addCallback(_maybe_upload)
399
400         def _succeeded(res):
401             self._count('objects_succeeded')
402             return res
403         def _failed(f):
404             self._count('objects_failed')
405             self._log("%r while processing %r" % (f, relpath_u))
406             return f
407         d.addCallbacks(_succeeded, _failed)
408         return d
409
410     def _get_metadata(self, encoded_path_u):
411         try:
412             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
413         except KeyError:
414             return Failure()
415         return d
416
417     def _get_filenode(self, encoded_path_u):
418         try:
419             d = self._upload_dirnode.get(encoded_path_u)
420         except KeyError:
421             return Failure()
422         return d
423
424
425 class WriteFileMixin(object):
426     FUDGE_SECONDS = 10.0
427
428     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
429         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
430                   % (abspath_u, len(file_contents), is_conflict, now))
431
432         # 1. Write a temporary file, say .foo.tmp.
433         # 2. is_conflict determines whether this is an overwrite or a conflict.
434         # 3. Set the mtime of the replacement file to be T seconds before the
435         #    current local time.
436         # 4. Perform a file replacement with backup filename foo.backup,
437         #    replaced file foo, and replacement file .foo.tmp. If any step of
438         #    this operation fails, reclassify as a conflict and stop.
439         #
440         # Returns the path of the destination file.
441
442         precondition_abspath(abspath_u)
443         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
444         backup_path_u = abspath_u + u".backup"
445         if now is None:
446             now = time.time()
447
448         # ensure parent directory exists
449         head, tail = os.path.split(abspath_u)
450         mode = 0777 # XXX
451         fileutil.make_dirs(head, mode)
452
453         fileutil.write(replacement_path_u, file_contents)
454         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
455         if is_conflict:
456             return self._rename_conflicted_file(abspath_u, replacement_path_u)
457         else:
458             try:
459                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
460                 return abspath_u
461             except fileutil.ConflictError:
462                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
463
464     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
465         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
466
467         conflict_path_u = abspath_u + u".conflict"
468         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
469         return conflict_path_u
470
471
472 class Downloader(QueueMixin, WriteFileMixin):
473     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
474
475     def __init__(self, client, local_path_u, db, collective_dircap, clock):
476         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
477
478         # TODO: allow a path rather than a cap URI.
479         self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
480
481         if not IDirectoryNode.providedBy(self._collective_dirnode):
482             raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
483         if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
484             raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
485
486         self._turn_delay = self.REMOTE_SCAN_INTERVAL
487         self._download_scan_batch = {} # path -> [(filenode, metadata)]
488
489     def start_scanning(self):
490         self._log("start_scanning")
491         files = self._db.get_all_relpaths()
492         self._log("all files %s" % files)
493
494         d = self._scan_remote_collective()
495         self._turn_deque()
496         return d
497
498     def stop(self):
499         self._stopped = True
500         d = defer.succeed(None)
501         d.addCallback(lambda ign: self._lazy_tail)
502         return d
503
504     def _should_download(self, relpath_u, remote_version):
505         """
506         _should_download returns a bool indicating whether or not a remote object should be downloaded.
507         We check the remote metadata version against our magic-folder db version number;
508         latest version wins.
509         """
510         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
511         if magicpath.should_ignore_file(relpath_u):
512             self._log("nope")
513             return False
514         self._log("yep")
515         v = self._db.get_local_file_version(relpath_u)
516         self._log("v = %r" % (v,))
517         return (v is None or v < remote_version)
518
519     def _get_local_latest(self, relpath_u):
520         """
521         _get_local_latest takes a unicode path string checks to see if this file object
522         exists in our magic-folder db; if not then return None
523         else check for an entry in our magic-folder db and return the version number.
524         """
525         if not self._get_filepath(relpath_u).exists():
526             return None
527         return self._db.get_local_file_version(relpath_u)
528
529     def _get_collective_latest_file(self, filename):
530         """
531         _get_collective_latest_file takes a file path pointing to a file managed by
532         magic-folder and returns a deferred that fires with the two tuple containing a
533         file node and metadata for the latest version of the file located in the
534         magic-folder collective directory.
535         """
536         collective_dirmap_d = self._collective_dirnode.list()
537         def scan_collective(result):
538             list_of_deferreds = []
539             for dir_name in result.keys():
540                 # XXX make sure it's a directory
541                 d = defer.succeed(None)
542                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
543                 list_of_deferreds.append(d)
544             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
545             return deferList
546         collective_dirmap_d.addCallback(scan_collective)
547         def highest_version(deferredList):
548             max_version = 0
549             metadata = None
550             node = None
551             for success, result in deferredList:
552                 if success:
553                     if result[1]['version'] > max_version:
554                         node, metadata = result
555                         max_version = result[1]['version']
556             return node, metadata
557         collective_dirmap_d.addCallback(highest_version)
558         return collective_dirmap_d
559
560     def _append_to_batch(self, name, file_node, metadata):
561         if self._download_scan_batch.has_key(name):
562             self._download_scan_batch[name] += [(file_node, metadata)]
563         else:
564             self._download_scan_batch[name] = [(file_node, metadata)]
565
566     def _scan_remote(self, nickname, dirnode):
567         self._log("_scan_remote nickname %r" % (nickname,))
568         d = dirnode.list()
569         def scan_listing(listing_map):
570             for encoded_relpath_u in listing_map.keys():
571                 relpath_u = magicpath.magic2path(encoded_relpath_u)
572                 self._log("found %r" % (relpath_u,))
573
574                 file_node, metadata = listing_map[encoded_relpath_u]
575                 local_version = self._get_local_latest(relpath_u)
576                 remote_version = metadata.get('version', None)
577                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
578                 if local_version is None or remote_version is None or local_version < remote_version:
579                     self._log("%r added to download queue" % (relpath_u,))
580                     self._append_to_batch(relpath_u, file_node, metadata)
581         d.addCallback(scan_listing)
582         d.addBoth(self._logcb, "end of _scan_remote")
583         return d
584
585     def _scan_remote_collective(self):
586         self._log("_scan_remote_collective")
587         self._download_scan_batch = {} # XXX
588
589         if self._collective_dirnode is None:
590             return
591         collective_dirmap_d = self._collective_dirnode.list()
592         def do_list(result):
593             others = [x for x in result.keys()]
594             return result, others
595         collective_dirmap_d.addCallback(do_list)
596         def scan_collective(result):
597             d = defer.succeed(None)
598             collective_dirmap, others_list = result
599             for dir_name in others_list:
600                 d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
601                 # XXX todo add errback
602             return d
603         collective_dirmap_d.addCallback(scan_collective)
604         collective_dirmap_d.addCallback(self._filter_scan_batch)
605         collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
606         return collective_dirmap_d
607
608     def _add_batch_to_download_queue(self, result):
609         self._log("result = %r" % (result,))
610         self._log("deque = %r" % (self._deque,))
611         self._deque.extend(result)
612         self._log("deque after = %r" % (self._deque,))
613         self._count('objects_queued', len(result))
614         self._log("pending = %r" % (self._pending,))
615         self._pending.update(map(lambda x: x[0], result))
616         self._log("pending after = %r" % (self._pending,))
617
618     def _filter_scan_batch(self, result):
619         self._log("_filter_scan_batch")
620         extension = [] # consider whether this should be a dict
621         for relpath_u in self._download_scan_batch.keys():
622             if relpath_u in self._pending:
623                 continue
624             file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
625             if self._should_download(relpath_u, metadata['version']):
626                 extension += [(relpath_u, file_node, metadata)]
627             else:
628                 self._log("Excluding '{0}'".format(relpath_u))
629                 self._count('objects_excluded')
630                 self._call_hook(None, 'processed')
631         return extension
632
633     def _when_queue_is_empty(self):
634         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
635         d.addBoth(self._logcb, "after _scan_remote_collective")
636         d.addCallback(lambda ign: self._turn_deque())
637         return d
638
639     def _process(self, item, now=None):
640         self._log("_process(%r)" % (item,))
641         if now is None:
642             now = time.time()
643         (relpath_u, file_node, metadata) = item
644         fp = self._get_filepath(relpath_u)
645         abspath_u = unicode_from_filepath(fp)
646
647         d = defer.succeed(None)
648         if relpath_u.endswith(u"/"):
649             self._log("mkdir(%r)" % (abspath_u,))
650             d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
651             d.addCallback(lambda ign: abspath_u)
652         else:
653             d.addCallback(lambda ign: file_node.download_best_version())
654             if metadata.get('deleted', False):
655                 d.addCallback(lambda result: self._unlink_deleted_file(abspath_u, result))
656             else:
657                 d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents, is_conflict=False))
658
659         def do_update_db(written_abspath_u):
660             filecap = file_node.get_uri()
661             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
662             last_downloaded_uri = filecap
663             last_downloaded_timestamp = now
664             written_pathinfo = get_pathinfo(written_abspath_u)
665             if not written_pathinfo.exists and not metadata.get('deleted', False):
666                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
667
668             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
669                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
670             self._count('objects_downloaded')
671         def failed(f):
672             self._log("download failed: %s" % (str(f),))
673             self._count('objects_failed')
674             return f
675         d.addCallbacks(do_update_db, failed)
676         def remove_from_pending(res):
677             self._pending.remove(relpath_u)
678             return res
679         d.addBoth(remove_from_pending)
680         return d
681
682     def _unlink_deleted_file(self, abspath_u, result):
683         try:
684             self._log('unlinking: %s' % (abspath_u,))
685             shutil.move(abspath_u, abspath_u + '.backup')
686         except IOError:
687             self._log("Already gone: '%s'" % (abspath_u,))
688         return abspath_u