]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
More debug logging.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 class MagicFolder(service.MultiService):
46     name = 'magic-folder'
47
48     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49                  pending_delay=1.0, clock=reactor):
50         precondition_abspath(local_path_u)
51
52         service.MultiService.__init__(self)
53
54         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
55         if db is None:
56             return Failure(Exception('ERROR: Unable to load magic folder db.'))
57
58         # for tests
59         self._client = client
60         self._db = db
61
62         self.is_ready = False
63
64         self.uploader = Uploader(client, local_path_u, db, upload_dircap, pending_delay, clock)
65         self.downloader = Downloader(client, local_path_u, db, collective_dircap, clock)
66
67     def startService(self):
68         # TODO: why is this being called more than once?
69         if self.running:
70             return defer.succeed(None)
71         print "%r.startService" % (self,)
72         service.MultiService.startService(self)
73         return self.uploader.start_monitoring()
74
75     def ready(self):
76         """ready is used to signal us to start
77         processing the upload and download items...
78         """
79         self.is_ready = True
80         d = self.uploader.start_scanning()
81         d2 = self.downloader.start_scanning()
82         d.addCallback(lambda ign: d2)
83         return d
84
85     def finish(self):
86         print "finish"
87         d = self.uploader.stop()
88         d2 = self.downloader.stop()
89         d.addCallback(lambda ign: d2)
90         return d
91
92     def remove_service(self):
93         return service.MultiService.disownServiceParent(self)
94
95
96 class QueueMixin(HookMixin):
97     def __init__(self, client, local_path_u, db, name, clock):
98         self._client = client
99         self._local_path_u = local_path_u
100         self._local_filepath = to_filepath(local_path_u)
101         self._db = db
102         self._name = name
103         self._clock = clock
104         self._hooks = {'processed': None, 'started': None}
105         self.started_d = self.set_hook('started')
106
107         if not self._local_filepath.exists():
108             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
109                                  "but there is no directory at that location."
110                                  % quote_local_unicode_path(self._local_path_u))
111         if not self._local_filepath.isdir():
112             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
113                                  "but the thing at that location is not a directory."
114                                  % quote_local_unicode_path(self._local_path_u))
115
116         self._deque = deque()
117         self._lazy_tail = defer.succeed(None)
118         self._pending = set()
119         self._stopped = False
120         self._turn_delay = 0
121
122     def _get_filepath(self, relpath_u):
123         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
124
125     def _get_relpath(self, filepath):
126         self._log("_get_relpath(%r)" % (filepath,))
127         segments = unicode_segments_from(filepath, self._local_filepath)
128         self._log("segments = %r" % (segments,))
129         return u"/".join(segments)
130
131     def _count(self, counter_name, delta=1):
132         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
133         self._log("%s += %r" % (counter_name, delta))
134         self._client.stats_provider.count(ctr, delta)
135
136     def _logcb(self, res, msg):
137         self._log("%s: %r" % (msg, res))
138         return res
139
140     def _log(self, msg):
141         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
142         self._client.log(s)
143         print s
144         #open("events", "ab+").write(msg)
145
146     def _append_to_deque(self, relpath_u):
147         self._log("_append_to_deque(%r)" % (relpath_u,))
148         if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
149             return
150         self._deque.append(relpath_u)
151         self._pending.add(relpath_u)
152         self._count('objects_queued')
153         if self.is_ready:
154             self._clock.callLater(0, self._turn_deque)
155
156     def _turn_deque(self):
157         self._log("_turn_deque")
158         if self._stopped:
159             self._log("stopped")
160             return
161         try:
162             item = self._deque.pop()
163             self._log("popped %r" % (item,))
164             self._count('objects_queued', -1)
165         except IndexError:
166             self._log("deque is now empty")
167             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
168         else:
169             self._lazy_tail.addCallback(lambda ign: self._process(item))
170             self._lazy_tail.addBoth(self._call_hook, 'processed')
171             self._lazy_tail.addErrback(log.err)
172             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
173
174
175 class Uploader(QueueMixin):
176     def __init__(self, client, local_path_u, db, upload_dircap, pending_delay, clock):
177         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
178
179         self.is_ready = False
180
181         # TODO: allow a path rather than a cap URI.
182         self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
183         if not IDirectoryNode.providedBy(self._upload_dirnode):
184             raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
185         if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
186             raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
187
188         self._inotify = get_inotify_module()
189         self._notifier = self._inotify.INotify()
190
191         if hasattr(self._notifier, 'set_pending_delay'):
192             self._notifier.set_pending_delay(pending_delay)
193
194         # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
195         #
196         self.mask = ( self._inotify.IN_CREATE
197                     | self._inotify.IN_CLOSE_WRITE
198                     | self._inotify.IN_MOVED_TO
199                     | self._inotify.IN_MOVED_FROM
200                     | self._inotify.IN_DELETE
201                     | self._inotify.IN_ONLYDIR
202                     | IN_EXCL_UNLINK
203                     )
204         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
205                              recursive=True)
206
207     def start_monitoring(self):
208         self._log("start_monitoring")
209         d = defer.succeed(None)
210         d.addCallback(lambda ign: self._notifier.startReading())
211         d.addCallback(lambda ign: self._count('dirs_monitored'))
212         d.addBoth(self._call_hook, 'started')
213         return d
214
215     def stop(self):
216         self._log("stop")
217         self._notifier.stopReading()
218         self._count('dirs_monitored', -1)
219         if hasattr(self._notifier, 'wait_until_stopped'):
220             d = self._notifier.wait_until_stopped()
221         else:
222             d = defer.succeed(None)
223         d.addCallback(lambda ign: self._lazy_tail)
224         return d
225
226     def start_scanning(self):
227         self._log("start_scanning")
228         self.is_ready = True
229         self._pending = self._db.get_all_relpaths()
230         self._log("all_files %r" % (self._pending))
231         d = self._scan(u"")
232         def _add_pending(ign):
233             # This adds all of the files that were in the db but not already processed
234             # (normally because they have been deleted on disk).
235             self._log("adding %r" % (self._pending))
236             self._deque.extend(self._pending)
237         d.addCallback(_add_pending)
238         d.addCallback(lambda ign: self._turn_deque())
239         return d
240
241     def _scan(self, reldir_u):
242         self._log("scan %r" % (reldir_u,))
243         fp = self._get_filepath(reldir_u)
244         try:
245             children = listdir_filepath(fp)
246         except EnvironmentError:
247             raise Exception("WARNING: magic folder: permission denied on directory %s"
248                             % quote_filepath(fp))
249         except FilenameEncodingError:
250             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
251                             % quote_filepath(fp))
252
253         d = defer.succeed(None)
254         for child in children:
255             assert isinstance(child, unicode), child
256             d.addCallback(lambda ign, child=child:
257                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
258             def _add_pending(relpath_u):
259                 if magicpath.should_ignore_file(relpath_u):
260                     return None
261
262                 self._pending.add(relpath_u)
263                 return relpath_u
264             d.addCallback(_add_pending)
265             # This call to _process doesn't go through the deque, and probably should.
266             d.addCallback(self._process)
267             d.addBoth(self._call_hook, 'processed')
268             d.addErrback(log.err)
269
270         return d
271
272     def _notify(self, opaque, path, events_mask):
273         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
274
275         # We filter out IN_CREATE events not associated with a directory.
276         # Acting on IN_CREATE for files could cause us to read and upload
277         # a possibly-incomplete file before the application has closed it.
278         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
279         # It isn't possible to avoid watching for IN_CREATE at all, because
280         # it is the only event notified for a directory creation.
281
282         if ((events_mask & self._inotify.IN_CREATE) != 0 and
283             (events_mask & self._inotify.IN_ISDIR) == 0):
284             self._log("ignoring inotify event for creation of file %r\n" % (path,))
285             return
286
287         relpath_u = self._get_relpath(path)
288         self._append_to_deque(relpath_u)
289
290     def _when_queue_is_empty(self):
291         return defer.succeed(None)
292
293     def _process(self, relpath_u):
294         self._log("_process(%r)" % (relpath_u,))
295         if relpath_u is None:
296             return
297         precondition(isinstance(relpath_u, unicode), relpath_u)
298
299         d = defer.succeed(None)
300
301         def _maybe_upload(val, now=None):
302             if now is None:
303                 now = time.time()
304             fp = self._get_filepath(relpath_u)
305             pathinfo = get_pathinfo(unicode_from_filepath(fp))
306
307             self._log("pending = %r, about to remove %r" % (self._pending, relpath_u))
308             self._pending.remove(relpath_u)
309             encoded_path_u = magicpath.path2magic(relpath_u)
310
311             if not pathinfo.exists:
312                 # FIXME merge this with the 'isfile' case.
313                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
314                 self._count('objects_disappeared')
315                 if not self._db.check_file_db_exists(relpath_u):
316                     return None
317
318                 last_downloaded_timestamp = now
319                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
320
321                 current_version = self._db.get_local_file_version(relpath_u)
322                 if current_version is None:
323                     new_version = 0
324                 else:
325                     new_version = current_version + 1
326
327                 metadata = { 'version': new_version,
328                              'deleted': True,
329                              'last_downloaded_timestamp': last_downloaded_timestamp }
330                 if last_downloaded_uri is not None:
331                     metadata['last_downloaded_uri'] = last_downloaded_uri
332
333                 empty_uploadable = Data("", self._client.convergence)
334                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
335                                                    metadata=metadata, overwrite=True)
336
337                 def _add_db_entry(filenode):
338                     filecap = filenode.get_uri()
339                     self._db.did_upload_version(relpath_u, new_version, filecap,
340                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
341                     self._count('files_uploaded')
342                 d2.addCallback(_add_db_entry)
343                 return d2
344             elif pathinfo.islink:
345                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
346                 return None
347             elif pathinfo.isdir:
348                 self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
349                 uploadable = Data("", self._client.convergence)
350                 encoded_path_u += magicpath.path2magic(u"/")
351                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
352                 def _succeeded(ign):
353                     self._log("created subdirectory %r" % (relpath_u,))
354                     self._count('directories_created')
355                 def _failed(f):
356                     self._log("failed to create subdirectory %r" % (relpath_u,))
357                     return f
358                 upload_d.addCallbacks(_succeeded, _failed)
359                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
360                 return upload_d
361             elif pathinfo.isfile:
362                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
363                 last_downloaded_timestamp = now
364
365                 current_version = self._db.get_local_file_version(relpath_u)
366                 if current_version is None:
367                     new_version = 0
368                 elif self._db.is_new_file(pathinfo, relpath_u):
369                     new_version = current_version + 1
370                 else:
371                     return None
372
373                 metadata = { 'version': new_version,
374                              'last_downloaded_timestamp': last_downloaded_timestamp }
375                 if last_downloaded_uri is not None:
376                     metadata['last_downloaded_uri'] = last_downloaded_uri
377
378                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
379                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
380                                                    metadata=metadata, overwrite=True)
381
382                 def _add_db_entry(filenode):
383                     filecap = filenode.get_uri()
384                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
385                     self._db.did_upload_version(relpath_u, new_version, filecap,
386                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
387                     self._count('files_uploaded')
388                 d2.addCallback(_add_db_entry)
389                 return d2
390             else:
391                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
392                 return None
393
394         d.addCallback(_maybe_upload)
395
396         def _succeeded(res):
397             self._count('objects_succeeded')
398             return res
399         def _failed(f):
400             self._count('objects_failed')
401             self._log("%r while processing %r" % (f, relpath_u))
402             return f
403         d.addCallbacks(_succeeded, _failed)
404         return d
405
406     def _get_metadata(self, encoded_path_u):
407         try:
408             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
409         except KeyError:
410             return Failure()
411         return d
412
413     def _get_filenode(self, encoded_path_u):
414         try:
415             d = self._upload_dirnode.get(encoded_path_u)
416         except KeyError:
417             return Failure()
418         return d
419
420
421 class WriteFileMixin(object):
422     FUDGE_SECONDS = 10.0
423
424     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
425         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
426                   % (abspath_u, len(file_contents), is_conflict, now))
427
428         # 1. Write a temporary file, say .foo.tmp.
429         # 2. is_conflict determines whether this is an overwrite or a conflict.
430         # 3. Set the mtime of the replacement file to be T seconds before the
431         #    current local time.
432         # 4. Perform a file replacement with backup filename foo.backup,
433         #    replaced file foo, and replacement file .foo.tmp. If any step of
434         #    this operation fails, reclassify as a conflict and stop.
435         #
436         # Returns the path of the destination file.
437
438         precondition_abspath(abspath_u)
439         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
440         backup_path_u = abspath_u + u".backup"
441         if now is None:
442             now = time.time()
443
444         # ensure parent directory exists
445         head, tail = os.path.split(abspath_u)
446         mode = 0777 # XXX
447         fileutil.make_dirs(head, mode)
448
449         fileutil.write(replacement_path_u, file_contents)
450         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
451         if is_conflict:
452             return self._rename_conflicted_file(abspath_u, replacement_path_u)
453         else:
454             try:
455                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
456                 return abspath_u
457             except fileutil.ConflictError:
458                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
459
460     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
461         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
462
463         conflict_path_u = abspath_u + u".conflict"
464         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
465         return conflict_path_u
466
467
468 class Downloader(QueueMixin, WriteFileMixin):
469     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
470
471     def __init__(self, client, local_path_u, db, collective_dircap, clock):
472         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
473
474         # TODO: allow a path rather than a cap URI.
475         self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
476
477         if not IDirectoryNode.providedBy(self._collective_dirnode):
478             raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
479         if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
480             raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
481
482         self._turn_delay = self.REMOTE_SCAN_INTERVAL
483         self._download_scan_batch = {} # path -> [(filenode, metadata)]
484
485     def start_scanning(self):
486         self._log("start_scanning")
487         files = self._db.get_all_relpaths()
488         self._log("all files %s" % files)
489
490         d = self._scan_remote_collective()
491         self._turn_deque()
492         return d
493
494     def stop(self):
495         self._stopped = True
496         d = defer.succeed(None)
497         d.addCallback(lambda ign: self._lazy_tail)
498         return d
499
500     def _should_download(self, relpath_u, remote_version):
501         """
502         _should_download returns a bool indicating whether or not a remote object should be downloaded.
503         We check the remote metadata version against our magic-folder db version number;
504         latest version wins.
505         """
506         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
507         if magicpath.should_ignore_file(relpath_u):
508             self._log("nope")
509             return False
510         self._log("yep")
511         v = self._db.get_local_file_version(relpath_u)
512         self._log("v = %r" % (v,))
513         return (v is None or v < remote_version)
514
515     def _get_local_latest(self, relpath_u):
516         """
517         _get_local_latest takes a unicode path string checks to see if this file object
518         exists in our magic-folder db; if not then return None
519         else check for an entry in our magic-folder db and return the version number.
520         """
521         if not self._get_filepath(relpath_u).exists():
522             return None
523         return self._db.get_local_file_version(relpath_u)
524
525     def _get_collective_latest_file(self, filename):
526         """
527         _get_collective_latest_file takes a file path pointing to a file managed by
528         magic-folder and returns a deferred that fires with the two tuple containing a
529         file node and metadata for the latest version of the file located in the
530         magic-folder collective directory.
531         """
532         collective_dirmap_d = self._collective_dirnode.list()
533         def scan_collective(result):
534             list_of_deferreds = []
535             for dir_name in result.keys():
536                 # XXX make sure it's a directory
537                 d = defer.succeed(None)
538                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
539                 list_of_deferreds.append(d)
540             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
541             return deferList
542         collective_dirmap_d.addCallback(scan_collective)
543         def highest_version(deferredList):
544             max_version = 0
545             metadata = None
546             node = None
547             for success, result in deferredList:
548                 if success:
549                     if result[1]['version'] > max_version:
550                         node, metadata = result
551                         max_version = result[1]['version']
552             return node, metadata
553         collective_dirmap_d.addCallback(highest_version)
554         return collective_dirmap_d
555
556     def _append_to_batch(self, name, file_node, metadata):
557         if self._download_scan_batch.has_key(name):
558             self._download_scan_batch[name] += [(file_node, metadata)]
559         else:
560             self._download_scan_batch[name] = [(file_node, metadata)]
561
562     def _scan_remote(self, nickname, dirnode):
563         self._log("_scan_remote nickname %r" % (nickname,))
564         d = dirnode.list()
565         def scan_listing(listing_map):
566             for encoded_relpath_u in listing_map.keys():
567                 relpath_u = magicpath.magic2path(encoded_relpath_u)
568                 self._log("found %r" % (relpath_u,))
569
570                 file_node, metadata = listing_map[encoded_relpath_u]
571                 local_version = self._get_local_latest(relpath_u)
572                 remote_version = metadata.get('version', None)
573                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
574                 if local_version is None or remote_version is None or local_version < remote_version:
575                     self._log("%r added to download queue" % (relpath_u,))
576                     self._append_to_batch(relpath_u, file_node, metadata)
577         d.addCallback(scan_listing)
578         d.addBoth(self._logcb, "end of _scan_remote")
579         return d
580
581     def _scan_remote_collective(self):
582         self._log("_scan_remote_collective")
583         self._download_scan_batch = {} # XXX
584
585         if self._collective_dirnode is None:
586             return
587         collective_dirmap_d = self._collective_dirnode.list()
588         def do_list(result):
589             others = [x for x in result.keys()]
590             return result, others
591         collective_dirmap_d.addCallback(do_list)
592         def scan_collective(result):
593             d = defer.succeed(None)
594             collective_dirmap, others_list = result
595             for dir_name in others_list:
596                 d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
597                 # XXX todo add errback
598             return d
599         collective_dirmap_d.addCallback(scan_collective)
600         collective_dirmap_d.addCallback(self._filter_scan_batch)
601         collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
602         return collective_dirmap_d
603
604     def _add_batch_to_download_queue(self, result):
605         self._log("result = %r" % (result,))
606         self._log("deque = %r" % (self._deque,))
607         self._deque.extend(result)
608         self._log("deque after = %r" % (self._deque,))
609         self._count('objects_queued', len(result))
610         self._log("pending = %r" % (self._pending,))
611         self._pending.update(map(lambda x: x[0], result))
612         self._log("pending after = %r" % (self._pending,))
613
614     def _filter_scan_batch(self, result):
615         self._log("_filter_scan_batch")
616         extension = [] # consider whether this should be a dict
617         for relpath_u in self._download_scan_batch.keys():
618             if relpath_u in self._pending:
619                 continue
620             file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
621             if self._should_download(relpath_u, metadata['version']):
622                 extension += [(relpath_u, file_node, metadata)]
623         return extension
624
625     def _when_queue_is_empty(self):
626         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
627         d.addBoth(self._logcb, "after _scan_remote_collective")
628         d.addCallback(lambda ign: self._turn_deque())
629         return d
630
631     def _process(self, item, now=None):
632         self._log("_process(%r)" % (item,))
633         if now is None:
634             now = time.time()
635         (relpath_u, file_node, metadata) = item
636         fp = self._get_filepath(relpath_u)
637         abspath_u = unicode_from_filepath(fp)
638
639         d = defer.succeed(None)
640         if relpath_u.endswith(u"/"):
641             self._log("mkdir(%r)" % (abspath_u,))
642             d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
643             d.addCallback(lambda ign: abspath_u)
644         else:
645             d.addCallback(lambda ign: file_node.download_best_version())
646             d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents, is_conflict=False))
647
648         def do_update_db(written_abspath_u):
649             filecap = file_node.get_uri()
650             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
651             last_downloaded_uri = filecap
652             last_downloaded_timestamp = now
653             written_pathinfo = get_pathinfo(written_abspath_u)
654             if not written_pathinfo.exists:
655                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
656
657             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
658                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
659             self._count('objects_downloaded')
660         def failed(f):
661             self._log("download failed: %s" % (str(f),))
662             self._count('objects_failed')
663             return f
664         d.addCallbacks(do_update_db, failed)
665         def remove_from_pending(res):
666             self._pending.remove(relpath_u)
667             return res
668         d.addBoth(remove_from_pending)
669         return d