4 from collections import deque
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19 extend_filepath, unicode_from_filepath, unicode_segments_from, \
20 quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
25 IN_EXCL_UNLINK = 0x04000000L
27 def get_inotify_module():
29 if sys.platform == "win32":
30 from allmydata.windows import inotify
31 elif runtime.platform.supportsINotify():
32 from twisted.internet import inotify
34 raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35 "This currently requires Linux or Windows.")
37 except (ImportError, AttributeError) as e:
39 if sys.platform == "win32":
40 raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41 "Windows support requires at least Vista, and has only been tested on Windows 7.")
45 def is_new_file(pathinfo, db_entry):
49 if not pathinfo.exists and db_entry.size is None:
52 return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
53 (db_entry.size, db_entry.ctime, db_entry.mtime))
56 class MagicFolder(service.MultiService):
59 def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
60 pending_delay=1.0, clock=None):
61 precondition_abspath(local_path_u)
63 service.MultiService.__init__(self)
65 immediate = clock is not None
66 clock = clock or reactor
67 db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
69 return Failure(Exception('ERROR: Unable to load magic folder db.'))
75 upload_dirnode = self._client.create_node_from_uri(upload_dircap)
76 collective_dirnode = self._client.create_node_from_uri(collective_dircap)
78 self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
79 self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
80 upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
82 def startService(self):
83 # TODO: why is this being called more than once?
85 return defer.succeed(None)
86 print "%r.startService" % (self,)
87 service.MultiService.startService(self)
88 return self.uploader.start_monitoring()
91 """ready is used to signal us to start
92 processing the upload and download items...
94 self.uploader.start_uploading() # synchronous
95 return self.downloader.start_downloading()
99 d = self.uploader.stop()
100 d2 = self.downloader.stop()
101 d.addCallback(lambda ign: d2)
104 def remove_service(self):
105 return service.MultiService.disownServiceParent(self)
108 class QueueMixin(HookMixin):
109 def __init__(self, client, local_path_u, db, name, clock):
110 self._client = client
111 self._local_path_u = local_path_u
112 self._local_filepath = to_filepath(local_path_u)
116 self._hooks = {'processed': None, 'started': None}
117 self.started_d = self.set_hook('started')
119 if not self._local_filepath.exists():
120 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
121 "but there is no directory at that location."
122 % quote_local_unicode_path(self._local_path_u))
123 if not self._local_filepath.isdir():
124 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
125 "but the thing at that location is not a directory."
126 % quote_local_unicode_path(self._local_path_u))
128 self._deque = deque()
129 self._lazy_tail = defer.succeed(None)
130 self._stopped = False
133 def _get_filepath(self, relpath_u):
134 self._log("_get_filepath(%r)" % (relpath_u,))
135 return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
137 def _get_relpath(self, filepath):
138 self._log("_get_relpath(%r)" % (filepath,))
139 segments = unicode_segments_from(filepath, self._local_filepath)
140 self._log("segments = %r" % (segments,))
141 return u"/".join(segments)
143 def _count(self, counter_name, delta=1):
144 ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
145 self._log("%s += %r" % (counter_name, delta))
146 self._client.stats_provider.count(ctr, delta)
148 def _logcb(self, res, msg):
149 self._log("%s: %r" % (msg, res))
153 s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
156 #open("events", "ab+").write(msg)
158 def _turn_deque(self):
159 self._log("_turn_deque")
164 item = self._deque.pop()
165 self._log("popped %r" % (item,))
166 self._count('objects_queued', -1)
168 self._log("deque is now empty")
169 self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
171 self._lazy_tail.addCallback(lambda ign: self._process(item))
172 self._lazy_tail.addBoth(self._call_hook, 'processed')
173 self._lazy_tail.addErrback(log.err)
174 self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
177 class Uploader(QueueMixin):
178 def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
180 QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
182 self.is_ready = False
183 self._immediate = immediate
185 if not IDirectoryNode.providedBy(upload_dirnode):
186 raise AssertionError("The URI in '%s' does not refer to a directory."
187 % os.path.join('private', 'magic_folder_dircap'))
188 if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
189 raise AssertionError("The URI in '%s' is not a writecap to a directory."
190 % os.path.join('private', 'magic_folder_dircap'))
192 self._upload_dirnode = upload_dirnode
193 self._inotify = get_inotify_module()
194 self._notifier = self._inotify.INotify()
195 self._pending = set() # of unicode relpaths
197 if hasattr(self._notifier, 'set_pending_delay'):
198 self._notifier.set_pending_delay(pending_delay)
200 # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
202 self.mask = ( self._inotify.IN_CREATE
203 | self._inotify.IN_CLOSE_WRITE
204 | self._inotify.IN_MOVED_TO
205 | self._inotify.IN_MOVED_FROM
206 | self._inotify.IN_DELETE
207 | self._inotify.IN_ONLYDIR
210 self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
213 def start_monitoring(self):
214 self._log("start_monitoring")
215 d = defer.succeed(None)
216 d.addCallback(lambda ign: self._notifier.startReading())
217 d.addCallback(lambda ign: self._count('dirs_monitored'))
218 d.addBoth(self._call_hook, 'started')
223 self._notifier.stopReading()
224 self._count('dirs_monitored', -1)
225 if hasattr(self._notifier, 'wait_until_stopped'):
226 d = self._notifier.wait_until_stopped()
228 d = defer.succeed(None)
229 d.addCallback(lambda ign: self._lazy_tail)
232 def start_uploading(self):
233 self._log("start_uploading")
236 all_relpaths = self._db.get_all_relpaths()
237 self._log("all relpaths: %r" % (all_relpaths,))
239 for relpath_u in all_relpaths:
240 self._add_pending(relpath_u)
243 self._extend_queue_and_keep_going(self._pending)
245 def _extend_queue_and_keep_going(self, relpaths_u):
246 self._log("queueing %r" % (relpaths_u,))
247 self._deque.extend(relpaths_u)
248 self._count('objects_queued', len(relpaths_u))
251 if self._immediate: # for tests
254 self._clock.callLater(0, self._turn_deque)
256 def _full_scan(self):
258 self._log("_pending %r" % (self._pending))
261 def _add_pending(self, relpath_u):
262 if not magicpath.should_ignore_file(relpath_u):
263 self._pending.add(relpath_u)
265 def _scan(self, reldir_u):
266 # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
267 # Note that this doesn't add them to the deque -- that will
269 self._log("scan %r" % (reldir_u,))
270 fp = self._get_filepath(reldir_u)
272 children = listdir_filepath(fp)
273 except EnvironmentError:
274 raise Exception("WARNING: magic folder: permission denied on directory %s"
275 % quote_filepath(fp))
276 except FilenameEncodingError:
277 raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
278 % quote_filepath(fp))
280 for child in children:
281 _assert(isinstance(child, unicode), child=child)
282 self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
284 def is_pending(self, relpath_u):
285 return relpath_u in self._pending
287 def _notify(self, opaque, path, events_mask):
288 self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
289 relpath_u = self._get_relpath(path)
291 # We filter out IN_CREATE events not associated with a directory.
292 # Acting on IN_CREATE for files could cause us to read and upload
293 # a possibly-incomplete file before the application has closed it.
294 # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
295 # It isn't possible to avoid watching for IN_CREATE at all, because
296 # it is the only event notified for a directory creation.
298 if ((events_mask & self._inotify.IN_CREATE) != 0 and
299 (events_mask & self._inotify.IN_ISDIR) == 0):
300 self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
302 if relpath_u in self._pending:
303 self._log("not queueing %r because it is already pending" % (relpath_u,))
305 if magicpath.should_ignore_file(relpath_u):
306 self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
309 self._pending.add(relpath_u)
310 self._extend_queue_and_keep_going([relpath_u])
312 def _when_queue_is_empty(self):
313 return defer.succeed(None)
315 def _process(self, relpath_u):
317 self._log("_process(%r)" % (relpath_u,))
318 if relpath_u is None:
320 precondition(isinstance(relpath_u, unicode), relpath_u)
321 precondition(not relpath_u.endswith(u'/'), relpath_u)
323 d = defer.succeed(None)
325 def _maybe_upload(val, now=None):
328 fp = self._get_filepath(relpath_u)
329 pathinfo = get_pathinfo(unicode_from_filepath(fp))
331 self._log("about to remove %r from pending set %r" %
332 (relpath_u, self._pending))
333 self._pending.remove(relpath_u)
334 encoded_path_u = magicpath.path2magic(relpath_u)
336 if not pathinfo.exists:
337 # FIXME merge this with the 'isfile' case.
338 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
339 self._count('objects_disappeared')
341 db_entry = self._db.get_db_entry(relpath_u)
345 last_downloaded_timestamp = now # is this correct?
347 if is_new_file(pathinfo, db_entry):
348 new_version = db_entry.version + 1
350 self._log("Not uploading %r" % (relpath_u,))
351 self._count('objects_not_uploaded')
354 metadata = { 'version': new_version,
356 'last_downloaded_timestamp': last_downloaded_timestamp }
357 if db_entry.last_downloaded_uri is not None:
358 metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
360 empty_uploadable = Data("", self._client.convergence)
361 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
362 metadata=metadata, overwrite=True)
364 def _add_db_entry(filenode):
365 filecap = filenode.get_uri()
366 last_downloaded_uri = metadata.get('last_downloaded_uri', None)
367 self._db.did_upload_version(relpath_u, new_version, filecap,
368 last_downloaded_uri, last_downloaded_timestamp,
370 self._count('files_uploaded')
371 d2.addCallback(_add_db_entry)
373 elif pathinfo.islink:
374 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
377 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
378 self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
380 uploadable = Data("", self._client.convergence)
381 encoded_path_u += magicpath.path2magic(u"/")
382 self._log("encoded_path_u = %r" % (encoded_path_u,))
383 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
384 def _dir_succeeded(ign):
385 self._log("created subdirectory %r" % (relpath_u,))
386 self._count('directories_created')
388 self._log("failed to create subdirectory %r" % (relpath_u,))
390 upload_d.addCallbacks(_dir_succeeded, _dir_failed)
391 upload_d.addCallback(lambda ign: self._scan(relpath_u))
392 upload_d.addCallback(lambda ign: self._extend_queue_and_keep_going(self._pending))
394 elif pathinfo.isfile:
395 db_entry = self._db.get_db_entry(relpath_u)
397 last_downloaded_timestamp = now
401 elif is_new_file(pathinfo, db_entry):
402 new_version = db_entry.version + 1
404 self._log("Not uploading %r" % (relpath_u,))
405 self._count('objects_not_uploaded')
408 metadata = { 'version': new_version,
409 'last_downloaded_timestamp': last_downloaded_timestamp }
410 if db_entry is not None and db_entry.last_downloaded_uri is not None:
411 metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
413 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
414 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
415 metadata=metadata, overwrite=True)
417 def _add_db_entry(filenode):
418 filecap = filenode.get_uri()
419 last_downloaded_uri = metadata.get('last_downloaded_uri', None)
420 self._db.did_upload_version(relpath_u, new_version, filecap,
421 last_downloaded_uri, last_downloaded_timestamp,
423 self._count('files_uploaded')
424 d2.addCallback(_add_db_entry)
427 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
430 d.addCallback(_maybe_upload)
433 self._count('objects_succeeded')
436 self._count('objects_failed')
437 self._log("%s while processing %r" % (f, relpath_u))
439 d.addCallbacks(_succeeded, _failed)
442 def _get_metadata(self, encoded_path_u):
444 d = self._upload_dirnode.get_metadata_for(encoded_path_u)
449 def _get_filenode(self, encoded_path_u):
451 d = self._upload_dirnode.get(encoded_path_u)
457 class WriteFileMixin(object):
460 def _get_conflicted_filename(self, abspath_u):
461 return abspath_u + u".conflict"
463 def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
464 self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
465 % (abspath_u, len(file_contents), is_conflict, now))
467 # 1. Write a temporary file, say .foo.tmp.
468 # 2. is_conflict determines whether this is an overwrite or a conflict.
469 # 3. Set the mtime of the replacement file to be T seconds before the
470 # current local time.
471 # 4. Perform a file replacement with backup filename foo.backup,
472 # replaced file foo, and replacement file .foo.tmp. If any step of
473 # this operation fails, reclassify as a conflict and stop.
475 # Returns the path of the destination file.
477 precondition_abspath(abspath_u)
478 replacement_path_u = abspath_u + u".tmp" # FIXME more unique
479 backup_path_u = abspath_u + u".backup"
483 # ensure parent directory exists
484 head, tail = os.path.split(abspath_u)
486 old_mask = os.umask(self._umask)
488 fileutil.make_dirs(head, (~ self._umask) & 0777)
489 fileutil.write(replacement_path_u, file_contents)
493 os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
495 print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
496 return self._rename_conflicted_file(abspath_u, replacement_path_u)
499 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
501 except fileutil.ConflictError:
502 return self._rename_conflicted_file(abspath_u, replacement_path_u)
504 def _rename_conflicted_file(self, abspath_u, replacement_path_u):
505 self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
507 conflict_path_u = self._get_conflicted_filename(abspath_u)
508 print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
509 if os.path.isfile(replacement_path_u):
510 print "%r exists" % (replacement_path_u,)
511 if os.path.isfile(conflict_path_u):
512 print "%r exists" % (conflict_path_u,)
514 fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
515 return conflict_path_u
517 def _rename_deleted_file(self, abspath_u):
518 self._log('renaming deleted file to backup: %s' % (abspath_u,))
520 fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
522 self._log("Already gone: '%s'" % (abspath_u,))
526 class Downloader(QueueMixin, WriteFileMixin):
527 REMOTE_SCAN_INTERVAL = 3 # facilitates tests
529 def __init__(self, client, local_path_u, db, collective_dirnode,
530 upload_readonly_dircap, clock, is_upload_pending, umask):
531 QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
533 if not IDirectoryNode.providedBy(collective_dirnode):
534 raise AssertionError("The URI in '%s' does not refer to a directory."
535 % os.path.join('private', 'collective_dircap'))
536 if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
537 raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
538 % os.path.join('private', 'collective_dircap'))
540 self._collective_dirnode = collective_dirnode
541 self._upload_readonly_dircap = upload_readonly_dircap
542 self._is_upload_pending = is_upload_pending
545 def start_downloading(self):
546 self._log("start_downloading")
547 files = self._db.get_all_relpaths()
548 self._log("all files %s" % files)
550 d = self._scan_remote_collective(scan_self=True)
551 d.addBoth(self._logcb, "after _scan_remote_collective 0")
557 d = defer.succeed(None)
558 d.addCallback(lambda ign: self._lazy_tail)
561 def _should_download(self, relpath_u, remote_version):
563 _should_download returns a bool indicating whether or not a remote object should be downloaded.
564 We check the remote metadata version against our magic-folder db version number;
567 self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
568 if magicpath.should_ignore_file(relpath_u):
572 db_entry = self._db.get_db_entry(relpath_u)
575 self._log("version %r" % (db_entry.version,))
576 return (db_entry.version < remote_version)
578 def _get_local_latest(self, relpath_u):
580 _get_local_latest takes a unicode path string checks to see if this file object
581 exists in our magic-folder db; if not then return None
582 else check for an entry in our magic-folder db and return the version number.
584 if not self._get_filepath(relpath_u).exists():
586 db_entry = self._db.get_db_entry(relpath_u)
587 return None if db_entry is None else db_entry.version
589 def _get_collective_latest_file(self, filename):
591 _get_collective_latest_file takes a file path pointing to a file managed by
592 magic-folder and returns a deferred that fires with the two tuple containing a
593 file node and metadata for the latest version of the file located in the
594 magic-folder collective directory.
596 collective_dirmap_d = self._collective_dirnode.list()
597 def scan_collective(result):
598 list_of_deferreds = []
599 for dir_name in result.keys():
600 # XXX make sure it's a directory
601 d = defer.succeed(None)
602 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
603 list_of_deferreds.append(d)
604 deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
606 collective_dirmap_d.addCallback(scan_collective)
607 def highest_version(deferredList):
611 for success, result in deferredList:
613 if result[1]['version'] > max_version:
614 node, metadata = result
615 max_version = result[1]['version']
616 return node, metadata
617 collective_dirmap_d.addCallback(highest_version)
618 return collective_dirmap_d
620 def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
621 self._log("_scan_remote_dmd nickname %r" % (nickname,))
623 def scan_listing(listing_map):
624 for encoded_relpath_u in listing_map.keys():
625 relpath_u = magicpath.magic2path(encoded_relpath_u)
626 self._log("found %r" % (relpath_u,))
628 file_node, metadata = listing_map[encoded_relpath_u]
629 local_version = self._get_local_latest(relpath_u)
630 remote_version = metadata.get('version', None)
631 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
633 if local_version is None or remote_version is None or local_version < remote_version:
634 self._log("%r added to download queue" % (relpath_u,))
635 if scan_batch.has_key(relpath_u):
636 scan_batch[relpath_u] += [(file_node, metadata)]
638 scan_batch[relpath_u] = [(file_node, metadata)]
640 d.addCallback(scan_listing)
641 d.addBoth(self._logcb, "end of _scan_remote_dmd")
644 def _scan_remote_collective(self, scan_self=False):
645 self._log("_scan_remote_collective")
646 scan_batch = {} # path -> [(filenode, metadata)]
648 d = self._collective_dirnode.list()
649 def scan_collective(dirmap):
650 d2 = defer.succeed(None)
651 for dir_name in dirmap:
652 (dirnode, metadata) = dirmap[dir_name]
653 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
654 d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
655 self._scan_remote_dmd(dir_name, dirnode, scan_batch))
656 def _err(f, dir_name=dir_name):
657 self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
658 # XXX what should we do to make this failure more visible to users?
662 d.addCallback(scan_collective)
664 def _filter_batch_to_deque(ign):
665 self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
666 for relpath_u in scan_batch.keys():
667 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
669 if self._should_download(relpath_u, metadata['version']):
670 self._deque.append( (relpath_u, file_node, metadata) )
672 self._log("Excluding %r" % (relpath_u,))
673 self._call_hook(None, 'processed')
675 self._log("deque after = %r" % (self._deque,))
676 d.addCallback(_filter_batch_to_deque)
679 def _when_queue_is_empty(self):
680 d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
681 d.addBoth(self._logcb, "after _scan_remote_collective 1")
682 d.addCallback(lambda ign: self._turn_deque())
685 def _process(self, item, now=None):
687 self._log("_process(%r)" % (item,))
690 (relpath_u, file_node, metadata) = item
691 fp = self._get_filepath(relpath_u)
692 abspath_u = unicode_from_filepath(fp)
693 conflict_path_u = self._get_conflicted_filename(abspath_u)
695 d = defer.succeed(None)
697 def do_update_db(written_abspath_u):
698 filecap = file_node.get_uri()
699 last_uploaded_uri = metadata.get('last_uploaded_uri', None)
700 last_downloaded_uri = filecap
701 last_downloaded_timestamp = now
702 written_pathinfo = get_pathinfo(written_abspath_u)
704 if not written_pathinfo.exists and not metadata.get('deleted', False):
705 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
707 self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
708 last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
709 self._count('objects_downloaded')
711 self._log("download failed: %s" % (str(f),))
712 self._count('objects_failed')
715 if os.path.isfile(conflict_path_u):
717 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
721 db_entry = self._db.get_db_entry(relpath_u)
722 dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
723 dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
725 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
726 if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
728 self._count('objects_conflicted')
729 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
731 self._count('objects_conflicted')
732 elif self._is_upload_pending(relpath_u):
734 self._count('objects_conflicted')
736 if relpath_u.endswith(u"/"):
737 if metadata.get('deleted', False):
738 self._log("rmdir(%r) ignored" % (abspath_u,))
740 self._log("mkdir(%r)" % (abspath_u,))
741 d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
742 d.addCallback(lambda ign: abspath_u)
744 if metadata.get('deleted', False):
745 d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
747 d.addCallback(lambda ign: file_node.download_best_version())
748 d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
749 is_conflict=is_conflict))
751 d.addCallbacks(do_update_db, failed)
753 def trap_conflicts(f):
754 f.trap(ConflictError)
756 d.addErrback(trap_conflicts)