raise
+def is_new_file(pathinfo, db_entry):
+ if db_entry is None:
+ return True
+
+ if not pathinfo.exists and db_entry.size is None:
+ return False
+
+ return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
+ (db_entry.size, db_entry.ctime, db_entry.mtime))
+
+
class MagicFolder(service.MultiService):
name = 'magic-folder'
last_downloaded_timestamp = now # is this correct?
- if self._db.is_new_file(pathinfo, relpath_u):
+ if is_new_file(pathinfo, db_entry):
new_version = db_entry.version + 1
else:
self._log("Not uploading %r" % (relpath_u,))
if db_entry is None:
new_version = 0
- elif self._db.is_new_file(pathinfo, relpath_u):
+ elif is_new_file(pathinfo, db_entry):
new_version = db_entry.version + 1
else:
self._log("Not uploading %r" % (relpath_u,))
(pathinfo.size, pathinfo.mtime, pathinfo.ctime, version, last_uploaded_uri, last_downloaded_uri, last_downloaded_timestamp, relpath_u))
self.connection.commit()
print "committed"
-
- def is_new_file(self, pathinfo, relpath_u):
- """
- Returns true if the file's current pathinfo (size, mtime, and ctime) has
- changed from the pathinfo previously stored in the db.
- """
- c = self.cursor
- c.execute("SELECT size, mtime, ctime"
- " FROM local_files"
- " WHERE path=?",
- (relpath_u,))
- row = self.cursor.fetchone()
- if not row:
- return True
- if not pathinfo.exists and row[0] is None:
- return False
- return (pathinfo.size, pathinfo.mtime, pathinfo.ctime) != row
row = c.fetchone()
self.failUnlessEqual(row, (pathinfo.size, pathinfo.mtime, pathinfo.ctime))
- # Second test uses db.is_new_file instead of SQL query directly
+ # Second test uses magic_folder.is_new_file instead of SQL query directly
# to confirm the previous upload entry in the db.
relpath2 = u"myFile2"
path2 = os.path.join(self.basedir, relpath2)
fileutil.write(path2, "meow\n")
pathinfo = fileutil.get_pathinfo(path2)
db.did_upload_version(relpath2, 0, 'URI:LIT:2', 'URI:LIT:1', 0, pathinfo)
- self.failUnlessFalse(db.is_new_file(pathinfo, relpath2))
+ db_entry = db.get_db_entry(relpath2)
+ self.failUnlessFalse(magic_folder.is_new_file(pathinfo, db_entry))
different_pathinfo = fileutil.PathInfo(isdir=False, isfile=True, islink=False,
exists=True, size=0, mtime=pathinfo.mtime, ctime=pathinfo.ctime)
- self.failUnlessTrue(db.is_new_file(different_pathinfo, relpath2))
+ self.failUnlessTrue(magic_folder.is_new_file(different_pathinfo, db_entry))
def test_magicfolder_start_service(self):
self.set_up_grid()