return None
else:
return row[0]
+
+ def did_upload_file(self, filecap, path, version, mtime, ctime, size):
+ now = time.time()
+ fileid = self.get_or_allocate_fileid_for_cap(filecap)
+ try:
+ self.cursor.execute("INSERT INTO last_upload VALUES (?,?,?)",
+ (fileid, now, now))
+ except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
+ self.cursor.execute("UPDATE last_upload"
+ " SET last_uploaded=?, last_checked=?"
+ " WHERE fileid=?",
+ (now, now, fileid))
+ try:
+ self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?,?)",
+ (path, size, mtime, ctime, fileid, version))
+ except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
+ self.cursor.execute("UPDATE local_files"
+ " SET size=?, mtime=?, ctime=?, fileid=?, version=?"
+ " WHERE path=?",
+ (size, mtime, ctime, fileid, path, version))
+ self.connection.commit()
service.MultiService.__init__(self)
self._stopped = False
- self._remote_scan_delay = 3 # XXX
+ self._remote_scan_delay = 10 # XXX
self._local_dir = abspath_expanduser_unicode(local_dir)
self._upload_lazy_tail = defer.succeed(None)
self._upload_pending = set()
We check the remote metadata version against our magic-folder db version number;
latest version wins.
"""
- # XXX todo
- return True
+ v = self._db.get_local_file_version(path)
+ if v is None:
+ return True
+ else:
+ if v < remote_version:
+ return True
+ else:
+ return False
def _scan_remote(self, nickname, dirnode):
listing_d = dirnode.list()
+ self._download_scan_batch = {}
def scan_listing(listing_map):
for name in listing_map.keys():
file_node, metadata = listing_map[name]
d = defer.succeed(None)
collective_dirmap, others_list = result
for dir_name in others_list:
+ # XXX this is broken
d.addCallback(lambda x: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
+ collective_dirmap_d.addCallback(self._filter_scan_batch)
+ collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
return d
collective_dirmap_d.addCallback(scan_collective)
- collective_dirmap_d.addCallback(self._filter_scan_batch)
- collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
return collective_dirmap_d
def _add_batch_to_download_queue(self, result):
extension = []
for name in self._download_scan_batch.keys():
if name in self._download_pending:
- # XXX
continue
- if len(self._download_scan_batch[name]) == 1:
- filename, file_node, metadata = self._download_scan_batch[name][0]
+ for item in self._download_scan_batch[name]:
+ (nickname, file_node, metadata) = item
if self._should_download(name, metadata['version']):
extension += [(name, file_node, metadata)]
- else:
- for item in self._download_scan_batch:
- nickname, file_node, metadata = item
- if self._should_download(name, metadata['version']):
- extension += [(name, file_node, metadata)]
return extension
def _download_file(self, name, file_node):
- print "_download_file"
d = file_node.download_best_version()
def succeeded(res):
d.addCallback(lambda result: self._write_downloaded_file(name, result))
self._stats_provider.count('magic_folder.objects_downloaded', +1)
+ return None
def failed(f):
- pass
+ return failure.Failure("download failed")
+ def remove_from_pending(result):
+ self._download_pending = self._download_pending.difference(set([name]))
d.addCallbacks(succeeded, failed)
d.addBoth(self._do_download_callback)
+ d.addBoth(remove_from_pending)
return d
def _write_downloaded_file(self, name, file_contents):
self.is_ready = True
self._turn_upload_deque()
self._turn_download_deque()
- self._scan_remote_collective()
-
- def _append_to_download_deque(self, name, file_node):
- if name in self._download_pending:
- return
- self._download_deque.append(file_node) # XXX
- self._download_pending.add(name)
- self._stats_provider.count('magic_folder.objects_queued_for_download', 1)
- reactor.callLater(0, self._turn_download_deque)
def _turn_download_deque(self):
- print "_turn_download_deque"
if self._stopped:
return
try:
except IndexError:
self._log("magic folder upload deque is now empty")
self._download_lazy_tail = defer.succeed(None)
- self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_download_deque))
+ self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._scan_remote_collective))
+ self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._turn_download_deque))
return
self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._download_file, file_path, file_node))
self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_download_deque))
def get_metadata(result):
try:
- metadata_d = self._parent.get_metadata_for(name)
+ metadata_d = self._upload_dirnode.get_metadata_for(name)
except KeyError:
return failure.Failure()
return metadata_d
- def get_local_version(path):
- v = self._db.get_local_file_version(path)
- if v is None:
- return 1
- else:
- return v
-
if not os.path.exists(path):
self._log("drop-upload: notified object %r disappeared "
"(this is normal for temporary objects)" % (path,))
if self._db.check_file_db_exists(path):
d2.addCallback(get_metadata)
def set_deleted(metadata):
- metadata['version'] = get_local_version(path) + 1
+ current_version = self._db.get_local_file_version(path) + 1
+ print "current version ", current_version
+ metadata['version'] = current_version
metadata['deleted'] = True
emptyUploadable = Data("", self._convergence)
- return self._parent.add_file(name, emptyUploadable, overwrite=True, metadata=metadata)
+ return self._upload_dirnode.add_file(name, emptyUploadable, overwrite=True, metadata=metadata)
d2.addCallback(set_deleted)
d2.addCallback(lambda x: Exception("file does not exist"))
return d2
if os.path.isdir(path):
return _add_dir(name)
elif os.path.isfile(path):
- version = get_local_version(path)
+ version = self._db.get_local_file_version(path)
+ if version is None:
+ version = 1
+ else:
+ version += 1
d2 = _add_file(name, version)
def add_db_entry(filenode):
filecap = filenode.get_uri()
size = s[stat.ST_SIZE]
ctime = s[stat.ST_CTIME]
mtime = s[stat.ST_MTIME]
- self._db.did_upload_file(filecap, path, mtime, ctime, size)
+ self._db.did_upload_file(filecap, path, version, mtime, ctime, size)
self._stats_provider.count('magic_folder.files_uploaded', 1)
d2.addCallback(add_db_entry)
return d2
self.alice_collective_dir, self.alice_upload_dircap, self.alice_magicfolder, self.bob_collective_dircap, self.bob_upload_dircap, self.bob_magicfolder = result
d.addCallback(get_results)
- def write_a_file(result):
- file_path = os.path.join(self.alice_magicfolder._local_dir, "file1")
- fileutil.write(file_path, "meow, meow meow. meow? meow meow! meow.")
+ def Alice_write_a_file(result):
+ print "Alice writes a file\n"
+ self.file_path = os.path.join(self.alice_magicfolder._local_dir, "file1")
+ fileutil.write(self.file_path, "meow, meow meow. meow? meow meow! meow.")
# XXX fix me --> self.notify(file_path, self.inotify.IN_CLOSE_WRITE)
- d.addCallback(write_a_file)
+ d.addCallback(Alice_write_a_file)
- def wait_for_upload(result):
+ def Alice_wait_for_upload(result):
+ print "Alice waits for an upload\n"
d2 = defer.Deferred()
self.alice_magicfolder.set_processed_callback(d2.callback, ignore_count=0)
return d2
- d.addCallback(wait_for_upload)
- def prepare_for_alice_stats(result):
+ d.addCallback(Alice_wait_for_upload)
+ def Alice_prepare_for_alice_stats(result):
self.stats_provider = self.alice_magicfolder._client.stats_provider
- d.addCallback(prepare_for_alice_stats)
+ d.addCallback(Alice_prepare_for_alice_stats)
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.objects_succeeded'), 1))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.files_uploaded'), 1))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.objects_queued'), 0))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.directories_created'), 0))
- def wait_for_download(result):
+ def Bob_wait_for_download(result):
+ print "Bob waits for a download\n"
d2 = defer.Deferred()
self.bob_magicfolder.set_download_callback(d2.callback, ignore_count=0)
return d2
- d.addCallback(wait_for_download)
- def prepare_for_bob_stats(result):
+ d.addCallback(Bob_wait_for_download)
+ def Bob_prepare_for_stats(result):
self.stats_provider = self.bob_magicfolder._client.stats_provider
- d.addCallback(prepare_for_bob_stats)
+ d.addCallback(Bob_prepare_for_stats)
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.objects_downloaded'), 1))
+ # test deletion of file behavior
+ def Alice_delete_file(result):
+ print "Alice deletes the file!\n"
+ os.unlink(self.file_path)
+ self.notify(self.file_path, self.inotify.IN_DELETE)
+ return None
+ d.addCallback(Alice_delete_file)
+ d.addCallback(Alice_wait_for_upload)
+ d.addCallback(Alice_prepare_for_alice_stats)
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.objects_succeeded'), 2)) # XXX ?
+ d.addCallback(Bob_wait_for_download)
+ d.addCallback(Bob_prepare_for_stats)
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.objects_downloaded'), 2)) # XXX ?
+
def cleanup_Alice_and_Bob(result):
d = defer.succeed(None)
d.addCallback(lambda ign: self.alice_magicfolder.finish(for_tests=True))