]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
a53cd3752b830c2cdb05ef39f124cfccf447a4b2
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 class MagicFolder(service.MultiService):
46     name = 'magic-folder'
47
48     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49                  pending_delay=1.0, clock=reactor):
50         precondition_abspath(local_path_u)
51
52         service.MultiService.__init__(self)
53
54         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
55         if db is None:
56             return Failure(Exception('ERROR: Unable to load magic folder db.'))
57
58         # for tests
59         self._client = client
60         self._db = db
61
62         self.is_ready = False
63
64         self.uploader = Uploader(client, local_path_u, db, upload_dircap, pending_delay, clock)
65         self.downloader = Downloader(client, local_path_u, db, collective_dircap, clock)
66
67     def startService(self):
68         # TODO: why is this being called more than once?
69         if self.running:
70             return defer.succeed(None)
71         print "%r.startService" % (self,)
72         service.MultiService.startService(self)
73         return self.uploader.start_monitoring()
74
75     def ready(self):
76         """ready is used to signal us to start
77         processing the upload and download items...
78         """
79         self.is_ready = True
80         d = self.uploader.start_scanning()
81         d2 = self.downloader.start_scanning()
82         d.addCallback(lambda ign: d2)
83         return d
84
85     def finish(self):
86         print "finish"
87         d = self.uploader.stop()
88         d2 = self.downloader.stop()
89         d.addCallback(lambda ign: d2)
90         return d
91
92     def remove_service(self):
93         return service.MultiService.disownServiceParent(self)
94
95
96 class QueueMixin(HookMixin):
97     def __init__(self, client, local_path_u, db, name, clock):
98         self._client = client
99         self._local_path_u = local_path_u
100         self._local_filepath = to_filepath(local_path_u)
101         self._db = db
102         self._name = name
103         self._clock = clock
104         self._hooks = {'processed': None, 'started': None}
105         self.started_d = self.set_hook('started')
106
107         if not self._local_filepath.exists():
108             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
109                                  "but there is no directory at that location."
110                                  % quote_local_unicode_path(self._local_path_u))
111         if not self._local_filepath.isdir():
112             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
113                                  "but the thing at that location is not a directory."
114                                  % quote_local_unicode_path(self._local_path_u))
115
116         self._deque = deque()
117         self._lazy_tail = defer.succeed(None)
118         self._pending = set()
119         self._stopped = False
120         self._turn_delay = 0
121
122     def _get_filepath(self, relpath_u):
123         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
124
125     def _get_relpath(self, filepath):
126         self._log("_get_relpath(%r)" % (filepath,))
127         segments = unicode_segments_from(filepath, self._local_filepath)
128         self._log("segments = %r" % (segments,))
129         return u"/".join(segments)
130
131     def _count(self, counter_name, delta=1):
132         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
133         self._log("%s += %r" % (counter_name, delta))
134         self._client.stats_provider.count(ctr, delta)
135
136     def _log(self, msg):
137         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
138         self._client.log(s)
139         print s
140         #open("events", "ab+").write(msg)
141
142     def _append_to_deque(self, relpath_u):
143         self._log("_append_to_deque(%r)" % (relpath_u,))
144         if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
145             return
146         self._deque.append(relpath_u)
147         self._pending.add(relpath_u)
148         self._count('objects_queued')
149         if self.is_ready:
150             self._clock.callLater(0, self._turn_deque)
151
152     def _turn_deque(self):
153         if self._stopped:
154             return
155         try:
156             item = self._deque.pop()
157             self._count('objects_queued', -1)
158         except IndexError:
159             self._log("deque is now empty")
160             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
161         else:
162             self._lazy_tail.addCallback(lambda ign: self._process(item))
163             self._lazy_tail.addBoth(self._call_hook, 'processed')
164             self._lazy_tail.addErrback(log.err)
165             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
166
167
168 class Uploader(QueueMixin):
169     def __init__(self, client, local_path_u, db, upload_dircap, pending_delay, clock):
170         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
171
172         self.is_ready = False
173
174         # TODO: allow a path rather than a cap URI.
175         self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
176         if not IDirectoryNode.providedBy(self._upload_dirnode):
177             raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
178         if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
179             raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
180
181         self._inotify = get_inotify_module()
182         self._notifier = self._inotify.INotify()
183
184         if hasattr(self._notifier, 'set_pending_delay'):
185             self._notifier.set_pending_delay(pending_delay)
186
187         # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
188         #
189         self.mask = ( self._inotify.IN_CREATE
190                     | self._inotify.IN_CLOSE_WRITE
191                     | self._inotify.IN_MOVED_TO
192                     | self._inotify.IN_MOVED_FROM
193                     | self._inotify.IN_DELETE
194                     | self._inotify.IN_ONLYDIR
195                     | IN_EXCL_UNLINK
196                     )
197         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
198                              recursive=True)
199
200     def start_monitoring(self):
201         self._log("start_monitoring")
202         d = defer.succeed(None)
203         d.addCallback(lambda ign: self._notifier.startReading())
204         d.addCallback(lambda ign: self._count('dirs_monitored'))
205         d.addBoth(self._call_hook, 'started')
206         return d
207
208     def stop(self):
209         self._log("stop")
210         self._notifier.stopReading()
211         self._count('dirs_monitored', -1)
212         if hasattr(self._notifier, 'wait_until_stopped'):
213             d = self._notifier.wait_until_stopped()
214         else:
215             d = defer.succeed(None)
216         d.addCallback(lambda ign: self._lazy_tail)
217         return d
218
219     def start_scanning(self):
220         self._log("start_scanning")
221         self.is_ready = True
222         self._pending = self._db.get_all_relpaths()
223         self._log("all_files %r" % (self._pending))
224         d = self._scan(u"")
225         def _add_pending(ign):
226             # This adds all of the files that were in the db but not already processed
227             # (normally because they have been deleted on disk).
228             self._log("adding %r" % (self._pending))
229             self._deque.extend(self._pending)
230         d.addCallback(_add_pending)
231         d.addCallback(lambda ign: self._turn_deque())
232         return d
233
234     def _scan(self, reldir_u):
235         self._log("scan %r" % (reldir_u,))
236         fp = self._get_filepath(reldir_u)
237         try:
238             children = listdir_filepath(fp)
239         except EnvironmentError:
240             raise Exception("WARNING: magic folder: permission denied on directory %s"
241                             % quote_filepath(fp))
242         except FilenameEncodingError:
243             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
244                             % quote_filepath(fp))
245
246         d = defer.succeed(None)
247         for child in children:
248             assert isinstance(child, unicode), child
249             d.addCallback(lambda ign, child=child:
250                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
251             def _add_pending(relpath_u):
252                 if magicpath.should_ignore_file(relpath_u):
253                     return None
254
255                 self._pending.add(relpath_u)
256                 return relpath_u
257             d.addCallback(_add_pending)
258             # This call to _process doesn't go through the deque, and probably should.
259             d.addCallback(self._process)
260             d.addBoth(self._call_hook, 'processed')
261             d.addErrback(log.err)
262
263         return d
264
265     def _notify(self, opaque, path, events_mask):
266         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
267
268         # We filter out IN_CREATE events not associated with a directory.
269         # Acting on IN_CREATE for files could cause us to read and upload
270         # a possibly-incomplete file before the application has closed it.
271         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
272         # It isn't possible to avoid watching for IN_CREATE at all, because
273         # it is the only event notified for a directory creation.
274
275         if ((events_mask & self._inotify.IN_CREATE) != 0 and
276             (events_mask & self._inotify.IN_ISDIR) == 0):
277             self._log("ignoring inotify event for creation of file %r\n" % (path,))
278             return
279
280         relpath_u = self._get_relpath(path)
281         self._append_to_deque(relpath_u)
282
283     def _when_queue_is_empty(self):
284         return defer.succeed(None)
285
286     def _process(self, relpath_u):
287         self._log("_process(%r)" % (relpath_u,))
288         if relpath_u is None:
289             return
290         precondition(isinstance(relpath_u, unicode), relpath_u)
291
292         d = defer.succeed(None)
293
294         def _maybe_upload(val, now=None):
295             if now is None:
296                 now = time.time()
297             fp = self._get_filepath(relpath_u)
298             pathinfo = get_pathinfo(unicode_from_filepath(fp))
299
300             self._log("pending = %r, about to remove %r" % (self._pending, relpath_u))
301             self._pending.remove(relpath_u)
302             encoded_path_u = magicpath.path2magic(relpath_u)
303
304             if not pathinfo.exists:
305                 # FIXME merge this with the 'isfile' case.
306                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
307                 self._count('objects_disappeared')
308                 if not self._db.check_file_db_exists(relpath_u):
309                     return None
310
311                 last_downloaded_timestamp = now
312                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
313
314                 current_version = self._db.get_local_file_version(relpath_u)
315                 if current_version is None:
316                     new_version = 0
317                 else:
318                     new_version = current_version + 1
319
320                 metadata = { 'version': new_version,
321                              'deleted': True,
322                              'last_downloaded_timestamp': last_downloaded_timestamp }
323                 if last_downloaded_uri is not None:
324                     metadata['last_downloaded_uri'] = last_downloaded_uri
325
326                 empty_uploadable = Data("", self._client.convergence)
327                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
328                                                    metadata=metadata, overwrite=True)
329
330                 def _add_db_entry(filenode):
331                     filecap = filenode.get_uri()
332                     self._db.did_upload_version(relpath_u, new_version, filecap,
333                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
334                     self._count('files_uploaded')
335                 d2.addCallback(_add_db_entry)
336                 return d2
337             elif pathinfo.islink:
338                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
339                 return None
340             elif pathinfo.isdir:
341                 self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
342                 uploadable = Data("", self._client.convergence)
343                 encoded_path_u += magicpath.path2magic(u"/")
344                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
345                 def _succeeded(ign):
346                     self._log("created subdirectory %r" % (relpath_u,))
347                     self._count('directories_created')
348                 def _failed(f):
349                     self._log("failed to create subdirectory %r" % (relpath_u,))
350                     return f
351                 upload_d.addCallbacks(_succeeded, _failed)
352                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
353                 return upload_d
354             elif pathinfo.isfile:
355                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
356                 last_downloaded_timestamp = now
357
358                 current_version = self._db.get_local_file_version(relpath_u)
359                 if current_version is None:
360                     new_version = 0
361                 elif self._db.is_new_file(pathinfo, relpath_u):
362                     new_version = current_version + 1
363                 else:
364                     return None
365
366                 metadata = { 'version': new_version,
367                              'last_downloaded_timestamp': last_downloaded_timestamp }
368                 if last_downloaded_uri is not None:
369                     metadata['last_downloaded_uri'] = last_downloaded_uri
370
371                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
372                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
373                                                    metadata=metadata, overwrite=True)
374
375                 def _add_db_entry(filenode):
376                     filecap = filenode.get_uri()
377                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
378                     self._db.did_upload_version(relpath_u, new_version, filecap,
379                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
380                     self._count('files_uploaded')
381                 d2.addCallback(_add_db_entry)
382                 return d2
383             else:
384                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
385                 return None
386
387         d.addCallback(_maybe_upload)
388
389         def _succeeded(res):
390             self._count('objects_succeeded')
391             return res
392         def _failed(f):
393             self._count('objects_failed')
394             self._log("%r while processing %r" % (f, relpath_u))
395             return f
396         d.addCallbacks(_succeeded, _failed)
397         return d
398
399     def _get_metadata(self, encoded_path_u):
400         try:
401             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
402         except KeyError:
403             return Failure()
404         return d
405
406     def _get_filenode(self, encoded_path_u):
407         try:
408             d = self._upload_dirnode.get(encoded_path_u)
409         except KeyError:
410             return Failure()
411         return d
412
413
414 class WriteFileMixin(object):
415     FUDGE_SECONDS = 10.0
416
417     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
418         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
419                   % (abspath_u, len(file_contents), is_conflict, now))
420
421         # 1. Write a temporary file, say .foo.tmp.
422         # 2. is_conflict determines whether this is an overwrite or a conflict.
423         # 3. Set the mtime of the replacement file to be T seconds before the
424         #    current local time.
425         # 4. Perform a file replacement with backup filename foo.backup,
426         #    replaced file foo, and replacement file .foo.tmp. If any step of
427         #    this operation fails, reclassify as a conflict and stop.
428         #
429         # Returns the path of the destination file.
430
431         precondition_abspath(abspath_u)
432         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
433         backup_path_u = abspath_u + u".backup"
434         if now is None:
435             now = time.time()
436
437         # ensure parent directory exists
438         head, tail = os.path.split(abspath_u)
439         mode = 0777 # XXX
440         fileutil.make_dirs(head, mode)
441
442         fileutil.write(replacement_path_u, file_contents)
443         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
444         if is_conflict:
445             return self._rename_conflicted_file(abspath_u, replacement_path_u)
446         else:
447             try:
448                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
449                 return abspath_u
450             except fileutil.ConflictError:
451                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
452
453     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
454         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
455
456         conflict_path_u = abspath_u + u".conflict"
457         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
458         return conflict_path_u
459
460
461 class Downloader(QueueMixin, WriteFileMixin):
462     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
463
464     def __init__(self, client, local_path_u, db, collective_dircap, clock):
465         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
466
467         # TODO: allow a path rather than a cap URI.
468         self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
469
470         if not IDirectoryNode.providedBy(self._collective_dirnode):
471             raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
472         if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
473             raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
474
475         self._turn_delay = self.REMOTE_SCAN_INTERVAL
476         self._download_scan_batch = {} # path -> [(filenode, metadata)]
477
478     def start_scanning(self):
479         self._log("start_scanning")
480         files = self._db.get_all_relpaths()
481         self._log("all files %s" % files)
482
483         d = self._scan_remote_collective()
484         self._turn_deque()
485         return d
486
487     def stop(self):
488         self._stopped = True
489         d = defer.succeed(None)
490         d.addCallback(lambda ign: self._lazy_tail)
491         return d
492
493     def _should_download(self, relpath_u, remote_version):
494         """
495         _should_download returns a bool indicating whether or not a remote object should be downloaded.
496         We check the remote metadata version against our magic-folder db version number;
497         latest version wins.
498         """
499         if magicpath.should_ignore_file(relpath_u):
500             return False
501         v = self._db.get_local_file_version(relpath_u)
502         return (v is None or v < remote_version)
503
504     def _get_local_latest(self, relpath_u):
505         """
506         _get_local_latest takes a unicode path string checks to see if this file object
507         exists in our magic-folder db; if not then return None
508         else check for an entry in our magic-folder db and return the version number.
509         """
510         if not self._get_filepath(relpath_u).exists():
511             return None
512         return self._db.get_local_file_version(relpath_u)
513
514     def _get_collective_latest_file(self, filename):
515         """
516         _get_collective_latest_file takes a file path pointing to a file managed by
517         magic-folder and returns a deferred that fires with the two tuple containing a
518         file node and metadata for the latest version of the file located in the
519         magic-folder collective directory.
520         """
521         collective_dirmap_d = self._collective_dirnode.list()
522         def scan_collective(result):
523             list_of_deferreds = []
524             for dir_name in result.keys():
525                 # XXX make sure it's a directory
526                 d = defer.succeed(None)
527                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
528                 list_of_deferreds.append(d)
529             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
530             return deferList
531         collective_dirmap_d.addCallback(scan_collective)
532         def highest_version(deferredList):
533             max_version = 0
534             metadata = None
535             node = None
536             for success, result in deferredList:
537                 if success:
538                     if result[1]['version'] > max_version:
539                         node, metadata = result
540                         max_version = result[1]['version']
541             return node, metadata
542         collective_dirmap_d.addCallback(highest_version)
543         return collective_dirmap_d
544
545     def _append_to_batch(self, name, file_node, metadata):
546         if self._download_scan_batch.has_key(name):
547             self._download_scan_batch[name] += [(file_node, metadata)]
548         else:
549             self._download_scan_batch[name] = [(file_node, metadata)]
550
551     def _scan_remote(self, nickname, dirnode):
552         self._log("_scan_remote nickname %r" % (nickname,))
553         d = dirnode.list()
554         def scan_listing(listing_map):
555             for encoded_relpath_u in listing_map.keys():
556                 relpath_u = magicpath.magic2path(encoded_relpath_u)
557                 self._log("found %r" % (relpath_u,))
558
559                 file_node, metadata = listing_map[encoded_relpath_u]
560                 local_version = self._get_local_latest(relpath_u)
561                 remote_version = metadata.get('version', None)
562                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
563                 if local_version is None or remote_version is None or local_version < remote_version:
564                     self._log("%r added to download queue" % (relpath_u,))
565                     self._append_to_batch(relpath_u, file_node, metadata)
566         d.addCallback(scan_listing)
567         return d
568
569     def _scan_remote_collective(self):
570         self._log("_scan_remote_collective")
571         self._download_scan_batch = {} # XXX
572
573         if self._collective_dirnode is None:
574             return
575         collective_dirmap_d = self._collective_dirnode.list()
576         def do_list(result):
577             others = [x for x in result.keys()]
578             return result, others
579         collective_dirmap_d.addCallback(do_list)
580         def scan_collective(result):
581             d = defer.succeed(None)
582             collective_dirmap, others_list = result
583             for dir_name in others_list:
584                 d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
585                 # XXX todo add errback
586             return d
587         collective_dirmap_d.addCallback(scan_collective)
588         collective_dirmap_d.addCallback(self._filter_scan_batch)
589         collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
590         return collective_dirmap_d
591
592     def _add_batch_to_download_queue(self, result):
593         self._log("result = %r" % (result,))
594         self._log("deque = %r" % (self._deque,))
595         self._deque.extend(result)
596         self._log("deque after = %r" % (self._deque,))
597         self._count('objects_queued', len(result))
598         self._log("pending = %r" % (self._pending,))
599         self._pending.update(map(lambda x: x[0], result))
600         self._log("pending after = %r" % (self._pending,))
601
602     def _filter_scan_batch(self, result):
603         extension = [] # consider whether this should be a dict
604         for relpath_u in self._download_scan_batch.keys():
605             if relpath_u in self._pending:
606                 continue
607             file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
608             if self._should_download(relpath_u, metadata['version']):
609                 extension += [(relpath_u, file_node, metadata)]
610         return extension
611
612     def _when_queue_is_empty(self):
613         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
614         d.addCallback(lambda ign: self._turn_deque())
615         return d
616
617     def _process(self, item, now=None):
618         self._log("_process(%r)" % (item,))
619         if now is None:
620             now = time.time()
621         (relpath_u, file_node, metadata) = item
622         fp = self._get_filepath(relpath_u)
623         abspath_u = unicode_from_filepath(fp)
624
625         d = defer.succeed(None)
626         if relpath_u.endswith(u"/"):
627             self._log("mkdir(%r)" % (abspath_u,))
628             d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
629             d.addCallback(lambda ign: abspath_u)
630         else:
631             d.addCallback(lambda ign: file_node.download_best_version())
632             d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents, is_conflict=False))
633
634         def do_update_db(written_abspath_u):
635             filecap = file_node.get_uri()
636             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
637             last_downloaded_uri = filecap
638             last_downloaded_timestamp = now
639             written_pathinfo = get_pathinfo(written_abspath_u)
640             if not written_pathinfo.exists:
641                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
642
643             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
644                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
645             self._count('objects_downloaded')
646         def failed(f):
647             self._log("download failed: %s" % (str(f),))
648             self._count('objects_failed')
649             return f
650         d.addCallbacks(do_update_db, failed)
651         def remove_from_pending(res):
652             self._pending.remove(relpath_u)
653             return res
654         d.addBoth(remove_from_pending)
655         return d