4 from collections import deque
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19 extend_filepath, unicode_from_filepath, unicode_segments_from, \
20 quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
25 IN_EXCL_UNLINK = 0x04000000L
27 def get_inotify_module():
29 if sys.platform == "win32":
30 from allmydata.windows import inotify
31 elif runtime.platform.supportsINotify():
32 from twisted.internet import inotify
34 raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35 "This currently requires Linux or Windows.")
37 except (ImportError, AttributeError) as e:
39 if sys.platform == "win32":
40 raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41 "Windows support requires at least Vista, and has only been tested on Windows 7.")
45 class MagicFolder(service.MultiService):
48 def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49 pending_delay=1.0, clock=reactor):
50 precondition_abspath(local_path_u)
52 service.MultiService.__init__(self)
54 db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
56 return Failure(Exception('ERROR: Unable to load magic folder db.'))
64 upload_dirnode = self._client.create_node_from_uri(upload_dircap)
65 collective_dirnode = self._client.create_node_from_uri(collective_dircap)
67 self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock)
68 self.downloader = Downloader(client, local_path_u, db, collective_dirnode, upload_dirnode.get_readonly_uri(), clock)
70 def startService(self):
71 # TODO: why is this being called more than once?
73 return defer.succeed(None)
74 print "%r.startService" % (self,)
75 service.MultiService.startService(self)
76 return self.uploader.start_monitoring()
79 """ready is used to signal us to start
80 processing the upload and download items...
83 d = self.uploader.start_scanning()
84 d2 = self.downloader.start_scanning()
85 d.addCallback(lambda ign: d2)
90 d = self.uploader.stop()
91 d2 = self.downloader.stop()
92 d.addCallback(lambda ign: d2)
95 def remove_service(self):
96 return service.MultiService.disownServiceParent(self)
99 class QueueMixin(HookMixin):
100 def __init__(self, client, local_path_u, db, name, clock):
101 self._client = client
102 self._local_path_u = local_path_u
103 self._local_filepath = to_filepath(local_path_u)
107 self._hooks = {'processed': None, 'started': None}
108 self.started_d = self.set_hook('started')
110 if not self._local_filepath.exists():
111 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
112 "but there is no directory at that location."
113 % quote_local_unicode_path(self._local_path_u))
114 if not self._local_filepath.isdir():
115 raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
116 "but the thing at that location is not a directory."
117 % quote_local_unicode_path(self._local_path_u))
119 self._deque = deque()
120 self._lazy_tail = defer.succeed(None)
121 self._pending = set()
122 self._stopped = False
125 def _get_filepath(self, relpath_u):
126 return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
128 def _get_relpath(self, filepath):
129 self._log("_get_relpath(%r)" % (filepath,))
130 segments = unicode_segments_from(filepath, self._local_filepath)
131 self._log("segments = %r" % (segments,))
132 return u"/".join(segments)
134 def _count(self, counter_name, delta=1):
135 ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
136 self._log("%s += %r" % (counter_name, delta))
137 self._client.stats_provider.count(ctr, delta)
139 def _logcb(self, res, msg):
140 self._log("%s: %r" % (msg, res))
144 s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
147 #open("events", "ab+").write(msg)
149 def _append_to_deque(self, relpath_u):
150 self._log("_append_to_deque(%r)" % (relpath_u,))
151 if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
153 self._deque.append(relpath_u)
154 self._pending.add(relpath_u)
155 self._count('objects_queued')
157 self._clock.callLater(0, self._turn_deque)
159 def _turn_deque(self):
160 self._log("_turn_deque")
165 item = self._deque.pop()
166 self._log("popped %r" % (item,))
167 self._count('objects_queued', -1)
169 self._log("deque is now empty")
170 self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
172 self._lazy_tail.addCallback(lambda ign: self._process(item))
173 self._lazy_tail.addBoth(self._call_hook, 'processed')
174 self._lazy_tail.addErrback(log.err)
175 self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
178 class Uploader(QueueMixin):
179 def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock):
180 QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
182 self.is_ready = False
184 if not IDirectoryNode.providedBy(upload_dirnode):
185 raise AssertionError("The URI in '%s' does not refer to a directory."
186 % os.path.join('private', 'magic_folder_dircap'))
187 if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
188 raise AssertionError("The URI in '%s' is not a writecap to a directory."
189 % os.path.join('private', 'magic_folder_dircap'))
191 self._upload_dirnode = upload_dirnode
192 self._inotify = get_inotify_module()
193 self._notifier = self._inotify.INotify()
195 if hasattr(self._notifier, 'set_pending_delay'):
196 self._notifier.set_pending_delay(pending_delay)
198 # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
200 self.mask = ( self._inotify.IN_CREATE
201 | self._inotify.IN_CLOSE_WRITE
202 | self._inotify.IN_MOVED_TO
203 | self._inotify.IN_MOVED_FROM
204 | self._inotify.IN_DELETE
205 | self._inotify.IN_ONLYDIR
208 self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
211 def start_monitoring(self):
212 self._log("start_monitoring")
213 d = defer.succeed(None)
214 d.addCallback(lambda ign: self._notifier.startReading())
215 d.addCallback(lambda ign: self._count('dirs_monitored'))
216 d.addBoth(self._call_hook, 'started')
221 self._notifier.stopReading()
222 self._count('dirs_monitored', -1)
223 if hasattr(self._notifier, 'wait_until_stopped'):
224 d = self._notifier.wait_until_stopped()
226 d = defer.succeed(None)
227 d.addCallback(lambda ign: self._lazy_tail)
230 def start_scanning(self):
231 self._log("start_scanning")
233 self._pending = self._db.get_all_relpaths()
234 self._log("all_files %r" % (self._pending))
236 def _add_pending(ign):
237 # This adds all of the files that were in the db but not already processed
238 # (normally because they have been deleted on disk).
239 self._log("adding %r" % (self._pending))
240 self._deque.extend(self._pending)
241 d.addCallback(_add_pending)
242 d.addCallback(lambda ign: self._turn_deque())
245 def _scan(self, reldir_u):
246 self._log("scan %r" % (reldir_u,))
247 fp = self._get_filepath(reldir_u)
249 children = listdir_filepath(fp)
250 except EnvironmentError:
251 raise Exception("WARNING: magic folder: permission denied on directory %s"
252 % quote_filepath(fp))
253 except FilenameEncodingError:
254 raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
255 % quote_filepath(fp))
257 d = defer.succeed(None)
258 for child in children:
259 _assert(isinstance(child, unicode), child=child)
260 d.addCallback(lambda ign, child=child:
261 ("%s/%s" % (reldir_u, child) if reldir_u else child))
262 def _add_pending(relpath_u):
263 if magicpath.should_ignore_file(relpath_u):
266 self._pending.add(relpath_u)
268 d.addCallback(_add_pending)
269 # This call to _process doesn't go through the deque, and probably should.
270 d.addCallback(self._process)
271 d.addBoth(self._call_hook, 'processed')
272 d.addErrback(log.err)
276 def _notify(self, opaque, path, events_mask):
277 self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
279 # We filter out IN_CREATE events not associated with a directory.
280 # Acting on IN_CREATE for files could cause us to read and upload
281 # a possibly-incomplete file before the application has closed it.
282 # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
283 # It isn't possible to avoid watching for IN_CREATE at all, because
284 # it is the only event notified for a directory creation.
286 if ((events_mask & self._inotify.IN_CREATE) != 0 and
287 (events_mask & self._inotify.IN_ISDIR) == 0):
288 self._log("ignoring inotify event for creation of file %r\n" % (path,))
291 relpath_u = self._get_relpath(path)
292 self._append_to_deque(relpath_u)
294 def _when_queue_is_empty(self):
295 return defer.succeed(None)
297 def _process(self, relpath_u):
298 self._log("_process(%r)" % (relpath_u,))
299 if relpath_u is None:
301 precondition(isinstance(relpath_u, unicode), relpath_u)
303 d = defer.succeed(None)
305 def _maybe_upload(val, now=None):
308 fp = self._get_filepath(relpath_u)
309 pathinfo = get_pathinfo(unicode_from_filepath(fp))
311 self._log("pending = %r, about to remove %r" % (self._pending, relpath_u))
312 self._pending.remove(relpath_u)
313 encoded_path_u = magicpath.path2magic(relpath_u)
315 if not pathinfo.exists:
316 # FIXME merge this with the 'isfile' case.
317 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
318 self._count('objects_disappeared')
319 if not self._db.check_file_db_exists(relpath_u):
322 last_downloaded_timestamp = now
323 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
325 current_version = self._db.get_local_file_version(relpath_u)
326 if current_version is None:
328 elif self._db.is_new_file(pathinfo, relpath_u):
329 new_version = current_version + 1
331 self._log("Not uploading %r" % (relpath_u,))
332 self._count('objects_not_uploaded')
335 metadata = { 'version': new_version,
337 'last_downloaded_timestamp': last_downloaded_timestamp }
338 if last_downloaded_uri is not None:
339 metadata['last_downloaded_uri'] = last_downloaded_uri
341 empty_uploadable = Data("", self._client.convergence)
342 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
343 metadata=metadata, overwrite=True)
345 def _add_db_entry(filenode):
346 filecap = filenode.get_uri()
347 self._db.did_upload_version(relpath_u, new_version, filecap,
348 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
349 self._count('files_uploaded')
350 d2.addCallback(_add_db_entry)
352 elif pathinfo.islink:
353 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
356 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
357 self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
359 uploadable = Data("", self._client.convergence)
360 encoded_path_u += magicpath.path2magic(u"/")
361 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
363 self._log("created subdirectory %r" % (relpath_u,))
364 self._count('directories_created')
366 self._log("failed to create subdirectory %r" % (relpath_u,))
368 upload_d.addCallbacks(_succeeded, _failed)
369 upload_d.addCallback(lambda ign: self._scan(relpath_u))
371 elif pathinfo.isfile:
372 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
373 last_downloaded_timestamp = now
375 current_version = self._db.get_local_file_version(relpath_u)
376 if current_version is None:
378 elif self._db.is_new_file(pathinfo, relpath_u):
379 new_version = current_version + 1
381 self._log("Not uploading %r" % (relpath_u,))
382 self._count('objects_not_uploaded')
385 metadata = { 'version': new_version,
386 'last_downloaded_timestamp': last_downloaded_timestamp }
387 if last_downloaded_uri is not None:
388 metadata['last_downloaded_uri'] = last_downloaded_uri
390 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
391 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
392 metadata=metadata, overwrite=True)
394 def _add_db_entry(filenode):
395 filecap = filenode.get_uri()
396 last_downloaded_uri = metadata.get('last_downloaded_uri', None)
397 self._db.did_upload_version(relpath_u, new_version, filecap,
398 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
399 self._count('files_uploaded')
400 d2.addCallback(_add_db_entry)
403 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
406 d.addCallback(_maybe_upload)
409 self._count('objects_succeeded')
412 self._count('objects_failed')
413 self._log("%s while processing %r" % (f, relpath_u))
415 d.addCallbacks(_succeeded, _failed)
418 def _get_metadata(self, encoded_path_u):
420 d = self._upload_dirnode.get_metadata_for(encoded_path_u)
425 def _get_filenode(self, encoded_path_u):
427 d = self._upload_dirnode.get(encoded_path_u)
433 class WriteFileMixin(object):
436 def _get_conflicted_filename(self, abspath_u):
437 return abspath_u + u".conflict"
439 def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
440 self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
441 % (abspath_u, len(file_contents), is_conflict, now))
443 # 1. Write a temporary file, say .foo.tmp.
444 # 2. is_conflict determines whether this is an overwrite or a conflict.
445 # 3. Set the mtime of the replacement file to be T seconds before the
446 # current local time.
447 # 4. Perform a file replacement with backup filename foo.backup,
448 # replaced file foo, and replacement file .foo.tmp. If any step of
449 # this operation fails, reclassify as a conflict and stop.
451 # Returns the path of the destination file.
453 precondition_abspath(abspath_u)
454 replacement_path_u = abspath_u + u".tmp" # FIXME more unique
455 backup_path_u = abspath_u + u".backup"
459 # ensure parent directory exists
460 head, tail = os.path.split(abspath_u)
462 fileutil.make_dirs(head, mode)
464 fileutil.write(replacement_path_u, file_contents)
465 os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
467 print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
468 return self._rename_conflicted_file(abspath_u, replacement_path_u)
471 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
473 except fileutil.ConflictError:
474 return self._rename_conflicted_file(abspath_u, replacement_path_u)
476 def _rename_conflicted_file(self, abspath_u, replacement_path_u):
477 self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
479 conflict_path_u = self._get_conflicted_filename(abspath_u)
480 print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
481 if os.path.isfile(replacement_path_u):
482 print "%r exists" % (replacement_path_u,)
483 if os.path.isfile(conflict_path_u):
484 print "%r exists" % (conflict_path_u,)
486 fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
487 return conflict_path_u
489 def _rename_deleted_file(self, abspath_u):
490 self._log('renaming deleted file to backup: %s' % (abspath_u,))
492 fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
494 # XXX is this the correct error?
495 self._log("Already gone: '%s'" % (abspath_u,))
499 class Downloader(QueueMixin, WriteFileMixin):
500 REMOTE_SCAN_INTERVAL = 3 # facilitates tests
502 def __init__(self, client, local_path_u, db, collective_dirnode, upload_readonly_dircap, clock):
503 QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
505 if not IDirectoryNode.providedBy(collective_dirnode):
506 raise AssertionError("The URI in '%s' does not refer to a directory."
507 % os.path.join('private', 'collective_dircap'))
508 if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
509 raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
510 % os.path.join('private', 'collective_dircap'))
512 self._collective_dirnode = collective_dirnode
513 self._upload_readonly_dircap = upload_readonly_dircap
515 self._turn_delay = self.REMOTE_SCAN_INTERVAL
516 self._download_scan_batch = {} # path -> [(filenode, metadata)]
518 def start_scanning(self):
519 self._log("start_scanning")
520 files = self._db.get_all_relpaths()
521 self._log("all files %s" % files)
523 d = self._scan_remote_collective()
529 d = defer.succeed(None)
530 d.addCallback(lambda ign: self._lazy_tail)
533 def _should_download(self, relpath_u, remote_version):
535 _should_download returns a bool indicating whether or not a remote object should be downloaded.
536 We check the remote metadata version against our magic-folder db version number;
539 self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
540 if magicpath.should_ignore_file(relpath_u):
544 v = self._db.get_local_file_version(relpath_u)
545 self._log("v = %r" % (v,))
546 return (v is None or v < remote_version)
548 def _get_local_latest(self, relpath_u):
550 _get_local_latest takes a unicode path string checks to see if this file object
551 exists in our magic-folder db; if not then return None
552 else check for an entry in our magic-folder db and return the version number.
554 if not self._get_filepath(relpath_u).exists():
556 return self._db.get_local_file_version(relpath_u)
558 def _get_collective_latest_file(self, filename):
560 _get_collective_latest_file takes a file path pointing to a file managed by
561 magic-folder and returns a deferred that fires with the two tuple containing a
562 file node and metadata for the latest version of the file located in the
563 magic-folder collective directory.
565 collective_dirmap_d = self._collective_dirnode.list()
566 def scan_collective(result):
567 list_of_deferreds = []
568 for dir_name in result.keys():
569 # XXX make sure it's a directory
570 d = defer.succeed(None)
571 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
572 list_of_deferreds.append(d)
573 deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
575 collective_dirmap_d.addCallback(scan_collective)
576 def highest_version(deferredList):
580 for success, result in deferredList:
582 if result[1]['version'] > max_version:
583 node, metadata = result
584 max_version = result[1]['version']
585 return node, metadata
586 collective_dirmap_d.addCallback(highest_version)
587 return collective_dirmap_d
589 def _append_to_batch(self, name, file_node, metadata):
590 if self._download_scan_batch.has_key(name):
591 self._download_scan_batch[name] += [(file_node, metadata)]
593 self._download_scan_batch[name] = [(file_node, metadata)]
595 def _scan_remote(self, nickname, dirnode):
596 self._log("_scan_remote nickname %r" % (nickname,))
598 def scan_listing(listing_map):
599 for encoded_relpath_u in listing_map.keys():
600 relpath_u = magicpath.magic2path(encoded_relpath_u)
601 self._log("found %r" % (relpath_u,))
603 file_node, metadata = listing_map[encoded_relpath_u]
604 local_version = self._get_local_latest(relpath_u)
605 remote_version = metadata.get('version', None)
606 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
607 if local_version is None or remote_version is None or local_version < remote_version:
608 self._log("%r added to download queue" % (relpath_u,))
609 self._append_to_batch(relpath_u, file_node, metadata)
610 d.addCallback(scan_listing)
611 d.addBoth(self._logcb, "end of _scan_remote")
614 def _scan_remote_collective(self):
615 self._log("_scan_remote_collective")
616 self._download_scan_batch = {} # XXX
618 d = self._collective_dirnode.list()
619 def scan_collective(dirmap):
620 d2 = defer.succeed(None)
621 for dir_name in dirmap:
622 (dirnode, metadata) = dirmap[dir_name]
623 if dirnode.get_readonly_uri() != self._upload_readonly_dircap:
624 d2.addCallback(lambda ign, dir_name=dir_name: self._scan_remote(dir_name, dirnode))
626 self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
627 # XXX what should we do to make this failure more visible to users?
630 d.addCallback(scan_collective)
631 d.addCallback(self._filter_scan_batch)
632 d.addCallback(self._add_batch_to_download_queue)
635 def _add_batch_to_download_queue(self, result):
636 self._log("result = %r" % (result,))
637 self._log("deque = %r" % (self._deque,))
638 self._deque.extend(result)
639 self._log("deque after = %r" % (self._deque,))
640 self._count('objects_queued', len(result))
641 self._log("pending = %r" % (self._pending,))
642 self._pending.update(map(lambda x: x[0], result))
643 self._log("pending after = %r" % (self._pending,))
645 def _filter_scan_batch(self, result):
646 self._log("_filter_scan_batch")
647 extension = [] # consider whether this should be a dict
648 for relpath_u in self._download_scan_batch.keys():
649 if relpath_u in self._pending:
651 file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
652 if self._should_download(relpath_u, metadata['version']):
653 extension += [(relpath_u, file_node, metadata)]
655 self._log("Excluding %r" % (relpath_u,))
656 self._count('objects_excluded')
657 self._call_hook(None, 'processed')
660 def _when_queue_is_empty(self):
661 d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
662 d.addBoth(self._logcb, "after _scan_remote_collective")
663 d.addCallback(lambda ign: self._turn_deque())
666 def _process(self, item, now=None):
667 self._log("_process(%r)" % (item,))
670 (relpath_u, file_node, metadata) = item
671 fp = self._get_filepath(relpath_u)
672 abspath_u = unicode_from_filepath(fp)
673 conflict_path_u = self._get_conflicted_filename(abspath_u)
674 d = defer.succeed(None)
676 def do_update_db(written_abspath_u):
677 filecap = file_node.get_uri()
678 last_uploaded_uri = metadata.get('last_uploaded_uri', None)
679 last_downloaded_uri = filecap
680 last_downloaded_timestamp = now
681 written_pathinfo = get_pathinfo(written_abspath_u)
683 if not written_pathinfo.exists and not metadata.get('deleted', False):
684 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
686 self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
687 last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
688 self._count('objects_downloaded')
690 self._log("download failed: %s" % (str(f),))
691 self._count('objects_failed')
694 if os.path.isfile(conflict_path_u):
696 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
700 if self._db.check_file_db_exists(relpath_u):
701 dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
702 local_last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
703 print "metadata %r" % (metadata,)
704 print "<<<<--- if %r != %r" % (dmd_last_downloaded_uri, local_last_downloaded_uri)
705 if dmd_last_downloaded_uri is not None and local_last_downloaded_uri is not None:
706 if dmd_last_downloaded_uri != local_last_downloaded_uri:
708 self._count('objects_conflicted')
710 #dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
711 #local_last_uploaded_uri = ...
713 if relpath_u.endswith(u"/"):
714 if metadata.get('deleted', False):
715 self._log("rmdir(%r) ignored" % (abspath_u,))
717 self._log("mkdir(%r)" % (abspath_u,))
718 d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
719 d.addCallback(lambda ign: abspath_u)
721 if metadata.get('deleted', False):
722 d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
724 d.addCallback(lambda ign: file_node.download_best_version())
725 d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
726 is_conflict=is_conflict))
728 d.addCallbacks(do_update_db, failed)
730 def remove_from_pending(res):
731 self._pending.remove(relpath_u)
733 d.addBoth(remove_from_pending)
734 def trap_conflicts(f):
735 f.trap(ConflictError)
737 d.addErrback(trap_conflicts)