]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
Simplify and fix non-existent-file handling.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 class MagicFolder(service.MultiService):
46     name = 'magic-folder'
47
48     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49                  pending_delay=1.0, clock=reactor):
50         precondition_abspath(local_path_u)
51
52         service.MultiService.__init__(self)
53
54         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
55         if db is None:
56             return Failure(Exception('ERROR: Unable to load magic folder db.'))
57
58         # for tests
59         self._client = client
60         self._db = db
61
62         self.is_ready = False
63
64         self.uploader = Uploader(client, local_path_u, db, upload_dircap, pending_delay, clock)
65         self.downloader = Downloader(client, local_path_u, db, collective_dircap, clock)
66
67     def startService(self):
68         # TODO: why is this being called more than once?
69         if self.running:
70             return defer.succeed(None)
71         print "%r.startService" % (self,)
72         service.MultiService.startService(self)
73         return self.uploader.start_monitoring()
74
75     def ready(self):
76         """ready is used to signal us to start
77         processing the upload and download items...
78         """
79         self.is_ready = True
80         d = self.uploader.start_scanning()
81         d2 = self.downloader.start_scanning()
82         d.addCallback(lambda ign: d2)
83         return d
84
85     def finish(self):
86         print "finish"
87         d = self.uploader.stop()
88         d2 = self.downloader.stop()
89         d.addCallback(lambda ign: d2)
90         return d
91
92     def remove_service(self):
93         return service.MultiService.disownServiceParent(self)
94
95
96 class QueueMixin(HookMixin):
97     def __init__(self, client, local_path_u, db, name, clock):
98         self._client = client
99         self._local_path_u = local_path_u
100         self._local_filepath = to_filepath(local_path_u)
101         self._db = db
102         self._name = name
103         self._clock = clock
104         self._hooks = {'processed': None, 'started': None}
105         self.started_d = self.set_hook('started')
106
107         if not self._local_filepath.exists():
108             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
109                                  "but there is no directory at that location."
110                                  % quote_local_unicode_path(self._local_path_u))
111         if not self._local_filepath.isdir():
112             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
113                                  "but the thing at that location is not a directory."
114                                  % quote_local_unicode_path(self._local_path_u))
115
116         self._deque = deque()
117         self._lazy_tail = defer.succeed(None)
118         self._pending = set()
119         self._stopped = False
120         self._turn_delay = 0
121
122     def _get_filepath(self, relpath_u):
123         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
124
125     def _get_relpath(self, filepath):
126         self._log("_get_relpath(%r)" % (filepath,))
127         segments = unicode_segments_from(filepath, self._local_filepath)
128         self._log("segments = %r" % (segments,))
129         return u"/".join(segments)
130
131     def _count(self, counter_name, delta=1):
132         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
133         self._log("%s += %r" % (counter_name, delta))
134         self._client.stats_provider.count(ctr, delta)
135
136     def _log(self, msg):
137         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
138         self._client.log(s)
139         print s
140         #open("events", "ab+").write(msg)
141
142     def _append_to_deque(self, relpath_u):
143         self._log("_append_to_deque(%r)" % (relpath_u,))
144         if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
145             return
146         self._deque.append(relpath_u)
147         self._pending.add(relpath_u)
148         self._count('objects_queued')
149         if self.is_ready:
150             self._clock.callLater(0, self._turn_deque)
151
152     def _turn_deque(self):
153         if self._stopped:
154             return
155         try:
156             item = self._deque.pop()
157             self._count('objects_queued', -1)
158         except IndexError:
159             self._log("deque is now empty")
160             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
161         else:
162             self._lazy_tail.addCallback(lambda ign: self._process(item))
163             self._lazy_tail.addBoth(self._call_hook, 'processed')
164             self._lazy_tail.addErrback(log.err)
165             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
166
167
168 class Uploader(QueueMixin):
169     def __init__(self, client, local_path_u, db, upload_dircap, pending_delay, clock):
170         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
171
172         self.is_ready = False
173
174         # TODO: allow a path rather than a cap URI.
175         self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
176         if not IDirectoryNode.providedBy(self._upload_dirnode):
177             raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
178         if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
179             raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
180
181         self._inotify = get_inotify_module()
182         self._notifier = self._inotify.INotify()
183
184         if hasattr(self._notifier, 'set_pending_delay'):
185             self._notifier.set_pending_delay(pending_delay)
186
187         # We don't watch for IN_CREATE, because that would cause us to read and upload a
188         # possibly-incomplete file before the application has closed it. There should always
189         # be an IN_CLOSE_WRITE after an IN_CREATE (I think).
190         # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
191         #
192         self.mask = ( self._inotify.IN_CLOSE_WRITE
193                     | self._inotify.IN_MOVED_TO
194                     | self._inotify.IN_MOVED_FROM
195                     | self._inotify.IN_DELETE
196                     | self._inotify.IN_ONLYDIR
197                     | IN_EXCL_UNLINK
198                     )
199         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
200                              recursive=True)
201
202     def start_monitoring(self):
203         self._log("start_monitoring")
204         d = defer.succeed(None)
205         d.addCallback(lambda ign: self._notifier.startReading())
206         d.addCallback(lambda ign: self._count('dirs_monitored'))
207         d.addBoth(self._call_hook, 'started')
208         return d
209
210     def stop(self):
211         self._log("stop")
212         self._notifier.stopReading()
213         self._count('dirs_monitored', -1)
214         if hasattr(self._notifier, 'wait_until_stopped'):
215             d = self._notifier.wait_until_stopped()
216         else:
217             d = defer.succeed(None)
218         d.addCallback(lambda ign: self._lazy_tail)
219         return d
220
221     def start_scanning(self):
222         self._log("start_scanning")
223         self.is_ready = True
224         self._pending = self._db.get_all_relpaths()
225         self._log("all_files %r" % (self._pending))
226         d = self._scan(u"")
227         def _add_pending(ign):
228             # This adds all of the files that were in the db but not already processed
229             # (normally because they have been deleted on disk).
230             self._log("adding %r" % (self._pending))
231             self._deque.extend(self._pending)
232         d.addCallback(_add_pending)
233         d.addCallback(lambda ign: self._turn_deque())
234         return d
235
236     def _scan(self, reldir_u):
237         self._log("scan %r" % (reldir_u,))
238         fp = self._get_filepath(reldir_u)
239         try:
240             children = listdir_filepath(fp)
241         except EnvironmentError:
242             raise Exception("WARNING: magic folder: permission denied on directory %s"
243                             % quote_filepath(fp))
244         except FilenameEncodingError:
245             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
246                             % quote_filepath(fp))
247
248         d = defer.succeed(None)
249         for child in children:
250             assert isinstance(child, unicode), child
251             d.addCallback(lambda ign, child=child:
252                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
253             def _add_pending(relpath_u):
254                 if magicpath.should_ignore_file(relpath_u):
255                     return None
256
257                 self._pending.add(relpath_u)
258                 return relpath_u
259             d.addCallback(_add_pending)
260             # This call to _process doesn't go through the deque, and probably should.
261             d.addCallback(self._process)
262             d.addBoth(self._call_hook, 'processed')
263             d.addErrback(log.err)
264
265         return d
266
267     def _notify(self, opaque, path, events_mask):
268         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
269         relpath_u = self._get_relpath(path)
270         self._append_to_deque(relpath_u)
271
272     def _when_queue_is_empty(self):
273         return defer.succeed(None)
274
275     def _process(self, relpath_u):
276         self._log("_process(%r)" % (relpath_u,))
277         if relpath_u is None:
278             return
279         precondition(isinstance(relpath_u, unicode), relpath_u)
280
281         d = defer.succeed(None)
282
283         def _maybe_upload(val, now=None):
284             if now is None:
285                 now = time.time()
286             fp = self._get_filepath(relpath_u)
287             pathinfo = get_pathinfo(unicode_from_filepath(fp))
288
289             self._log("pending = %r, about to remove %r" % (self._pending, relpath_u))
290             self._pending.remove(relpath_u)
291             encoded_path_u = magicpath.path2magic(relpath_u)
292
293             if not pathinfo.exists:
294                 # FIXME merge this with the 'isfile' case.
295                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
296                 self._count('objects_disappeared')
297                 if not self._db.check_file_db_exists(relpath_u):
298                     return None
299
300                 last_downloaded_timestamp = now
301                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
302
303                 current_version = self._db.get_local_file_version(relpath_u)
304                 if current_version is None:
305                     new_version = 0
306                 else:
307                     new_version = current_version + 1
308
309                 metadata = { 'version': new_version,
310                              'deleted': True,
311                              'last_downloaded_timestamp': last_downloaded_timestamp }
312                 if last_downloaded_uri is not None:
313                     metadata['last_downloaded_uri'] = last_downloaded_uri
314
315                 empty_uploadable = Data("", self._client.convergence)
316                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
317                                                    metadata=metadata, overwrite=True)
318
319                 def _add_db_entry(filenode):
320                     filecap = filenode.get_uri()
321                     self._db.did_upload_version(relpath_u, new_version, filecap,
322                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
323                     self._count('files_uploaded')
324                 d2.addCallback(_add_db_entry)
325                 return d2
326             elif pathinfo.islink:
327                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
328                 return None
329             elif pathinfo.isdir:
330                 self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
331                 uploadable = Data("", self._client.convergence)
332                 encoded_path_u += magicpath.path2magic(u"/")
333                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
334                 def _succeeded(ign):
335                     self._log("created subdirectory %r" % (relpath_u,))
336                     self._count('directories_created')
337                 def _failed(f):
338                     self._log("failed to create subdirectory %r" % (relpath_u,))
339                     return f
340                 upload_d.addCallbacks(_succeeded, _failed)
341                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
342                 return upload_d
343             elif pathinfo.isfile:
344                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
345                 last_downloaded_timestamp = now
346
347                 current_version = self._db.get_local_file_version(relpath_u)
348                 if current_version is None:
349                     new_version = 0
350                 elif self._db.is_new_file(pathinfo, relpath_u):
351                     new_version = current_version + 1
352                 else:
353                     return None
354
355                 metadata = { 'version': new_version,
356                              'last_downloaded_timestamp': last_downloaded_timestamp }
357                 if last_downloaded_uri is not None:
358                     metadata['last_downloaded_uri'] = last_downloaded_uri
359
360                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
361                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
362                                                    metadata=metadata, overwrite=True)
363
364                 def _add_db_entry(filenode):
365                     filecap = filenode.get_uri()
366                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
367                     self._db.did_upload_version(relpath_u, new_version, filecap,
368                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
369                     self._count('files_uploaded')
370                 d2.addCallback(_add_db_entry)
371                 return d2
372             else:
373                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
374                 return None
375
376         d.addCallback(_maybe_upload)
377
378         def _succeeded(res):
379             self._count('objects_succeeded')
380             return res
381         def _failed(f):
382             self._count('objects_failed')
383             self._log("%r while processing %r" % (f, relpath_u))
384             return f
385         d.addCallbacks(_succeeded, _failed)
386         return d
387
388     def _get_metadata(self, encoded_path_u):
389         try:
390             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
391         except KeyError:
392             return Failure()
393         return d
394
395     def _get_filenode(self, encoded_path_u):
396         try:
397             d = self._upload_dirnode.get(encoded_path_u)
398         except KeyError:
399             return Failure()
400         return d
401
402
403 class Downloader(QueueMixin):
404     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
405
406     def __init__(self, client, local_path_u, db, collective_dircap, clock):
407         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
408
409         # TODO: allow a path rather than a cap URI.
410         self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
411
412         if not IDirectoryNode.providedBy(self._collective_dirnode):
413             raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
414         if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
415             raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
416
417         self._turn_delay = self.REMOTE_SCAN_INTERVAL
418         self._download_scan_batch = {} # path -> [(filenode, metadata)]
419
420     def start_scanning(self):
421         self._log("start_scanning")
422         files = self._db.get_all_relpaths()
423         self._log("all files %s" % files)
424
425         d = self._scan_remote_collective()
426         self._turn_deque()
427         return d
428
429     def stop(self):
430         self._stopped = True
431         d = defer.succeed(None)
432         d.addCallback(lambda ign: self._lazy_tail)
433         return d
434
435     def _should_download(self, relpath_u, remote_version):
436         """
437         _should_download returns a bool indicating whether or not a remote object should be downloaded.
438         We check the remote metadata version against our magic-folder db version number;
439         latest version wins.
440         """
441         if magicpath.should_ignore_file(relpath_u):
442             return False
443         v = self._db.get_local_file_version(relpath_u)
444         return (v is None or v < remote_version)
445
446     def _get_local_latest(self, relpath_u):
447         """
448         _get_local_latest takes a unicode path string checks to see if this file object
449         exists in our magic-folder db; if not then return None
450         else check for an entry in our magic-folder db and return the version number.
451         """
452         if not self._get_filepath(relpath_u).exists():
453             return None
454         return self._db.get_local_file_version(relpath_u)
455
456     def _get_collective_latest_file(self, filename):
457         """
458         _get_collective_latest_file takes a file path pointing to a file managed by
459         magic-folder and returns a deferred that fires with the two tuple containing a
460         file node and metadata for the latest version of the file located in the
461         magic-folder collective directory.
462         """
463         collective_dirmap_d = self._collective_dirnode.list()
464         def scan_collective(result):
465             list_of_deferreds = []
466             for dir_name in result.keys():
467                 # XXX make sure it's a directory
468                 d = defer.succeed(None)
469                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
470                 list_of_deferreds.append(d)
471             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
472             return deferList
473         collective_dirmap_d.addCallback(scan_collective)
474         def highest_version(deferredList):
475             max_version = 0
476             metadata = None
477             node = None
478             for success, result in deferredList:
479                 if success:
480                     if result[1]['version'] > max_version:
481                         node, metadata = result
482                         max_version = result[1]['version']
483             return node, metadata
484         collective_dirmap_d.addCallback(highest_version)
485         return collective_dirmap_d
486
487     def _append_to_batch(self, name, file_node, metadata):
488         if self._download_scan_batch.has_key(name):
489             self._download_scan_batch[name] += [(file_node, metadata)]
490         else:
491             self._download_scan_batch[name] = [(file_node, metadata)]
492
493     def _scan_remote(self, nickname, dirnode):
494         self._log("_scan_remote nickname %r" % (nickname,))
495         d = dirnode.list()
496         def scan_listing(listing_map):
497             for name in listing_map.keys():
498                 file_node, metadata = listing_map[name]
499                 local_version = self._get_local_latest(name)
500                 remote_version = metadata.get('version', None)
501                 self._log("%r has local version %r, remote version %r" % (name, local_version, remote_version))
502                 if local_version is None or remote_version is None or local_version < remote_version:
503                     self._log("added to download queue\n")
504                     self._append_to_batch(name, file_node, metadata)
505         d.addCallback(scan_listing)
506         return d
507
508     def _scan_remote_collective(self):
509         self._log("_scan_remote_collective")
510         self._download_scan_batch = {} # XXX
511
512         if self._collective_dirnode is None:
513             return
514         collective_dirmap_d = self._collective_dirnode.list()
515         def do_list(result):
516             others = [x for x in result.keys()]
517             return result, others
518         collective_dirmap_d.addCallback(do_list)
519         def scan_collective(result):
520             d = defer.succeed(None)
521             collective_dirmap, others_list = result
522             for dir_name in others_list:
523                 d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
524                 # XXX todo add errback
525             return d
526         collective_dirmap_d.addCallback(scan_collective)
527         collective_dirmap_d.addCallback(self._filter_scan_batch)
528         collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
529         return collective_dirmap_d
530
531     def _add_batch_to_download_queue(self, result):
532         self._log("result = %r" % (result,))
533         self._log("deque = %r" % (self._deque,))
534         self._deque.extend(result)
535         self._log("deque after = %r" % (self._deque,))
536         self._count('objects_queued', len(result))
537         self._log("pending = %r" % (self._pending,))
538         self._pending.update(map(lambda x: x[0], result))
539         self._log("pending after = %r" % (self._pending,))
540
541     def _filter_scan_batch(self, result):
542         extension = [] # consider whether this should be a dict
543         for relpath_u in self._download_scan_batch.keys():
544             if relpath_u in self._pending:
545                 continue
546             file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
547             if self._should_download(relpath_u, metadata['version']):
548                 extension += [(relpath_u, file_node, metadata)]
549         return extension
550
551     def _when_queue_is_empty(self):
552         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
553         d.addCallback(lambda ign: self._turn_deque())
554         return d
555
556     def _process(self, item, now=None):
557         self._log("_process(%r)" % (item,))
558         if now is None:
559             now = time.time()
560         (relpath_u, file_node, metadata) = item
561         d = file_node.download_best_version()
562         def succeeded(res):
563             fp = self._get_filepath(relpath_u)
564             abspath_u = unicode_from_filepath(fp)
565             d2 = defer.succeed(res)
566             d2.addCallback(lambda result: self._write_downloaded_file(abspath_u, result, is_conflict=False))
567             def do_update_db(written_abspath_u):
568                 filecap = file_node.get_uri()
569                 last_uploaded_uri = metadata.get('last_uploaded_uri', None)
570                 last_downloaded_uri = filecap
571                 last_downloaded_timestamp = now
572                 written_pathinfo = get_pathinfo(written_abspath_u)
573                 if not written_pathinfo.exists:
574                     raise Exception("downloaded file %s disappeared" % quote_local_unicode_path(written_abspath_u))
575
576                 self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
577                                             last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
578             d2.addCallback(do_update_db)
579             # XXX handle failure here with addErrback...
580             self._count('objects_downloaded')
581             return d2
582         def failed(f):
583             self._log("download failed: %s" % (str(f),))
584             self._count('objects_download_failed')
585             return f
586         d.addCallbacks(succeeded, failed)
587         def remove_from_pending(res):
588             self._pending.remove(relpath_u)
589             return res
590         d.addBoth(remove_from_pending)
591         return d
592
593     FUDGE_SECONDS = 10.0
594
595     @classmethod
596     def _write_downloaded_file(cls, abspath_u, file_contents, is_conflict=False, now=None):
597         # 1. Write a temporary file, say .foo.tmp.
598         # 2. is_conflict determines whether this is an overwrite or a conflict.
599         # 3. Set the mtime of the replacement file to be T seconds before the
600         #    current local time.
601         # 4. Perform a file replacement with backup filename foo.backup,
602         #    replaced file foo, and replacement file .foo.tmp. If any step of
603         #    this operation fails, reclassify as a conflict and stop.
604         #
605         # Returns the path of the destination file.
606
607         precondition_abspath(abspath_u)
608         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
609         backup_path_u = abspath_u + u".backup"
610         if now is None:
611             now = time.time()
612
613         # ensure parent directory exists
614         head, tail = os.path.split(abspath_u)
615         mode = 0777 # XXX
616         fileutil.make_dirs(head, mode)
617
618         fileutil.write(replacement_path_u, file_contents)
619         os.utime(replacement_path_u, (now, now - cls.FUDGE_SECONDS))
620         if is_conflict:
621             return cls._rename_conflicted_file(abspath_u, replacement_path_u)
622         else:
623             try:
624                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
625                 return abspath_u
626             except fileutil.ConflictError:
627                 return cls._rename_conflicted_file(abspath_u, replacement_path_u)
628
629     @classmethod
630     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
631         conflict_path_u = abspath_u + u".conflict"
632         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
633         return conflict_path_u