2 import os.path, sys, time, random, stat
4 from allmydata.util.netstring import netstring
5 from allmydata.util.hashutil import backupdb_dirhash
6 from allmydata.util import base32
7 from allmydata.util.fileutil import abspath_expanduser_unicode
8 from allmydata.util.encodingutil import to_str
14 CREATE TABLE version -- added in v1
16 version INTEGER -- contains one row, set to 2
19 CREATE TABLE local_files -- added in v1
21 path VARCHAR(1024) PRIMARY KEY, -- index, this is an absolute UTF-8-encoded local filename
22 size INTEGER, -- os.stat(fn)[stat.ST_SIZE]
23 mtime NUMBER, -- os.stat(fn)[stat.ST_MTIME]
24 ctime NUMBER, -- os.stat(fn)[stat.ST_CTIME]
28 CREATE TABLE caps -- added in v1
30 fileid INTEGER PRIMARY KEY AUTOINCREMENT,
31 filecap VARCHAR(256) UNIQUE -- URI:CHK:...
34 CREATE TABLE last_upload -- added in v1
36 fileid INTEGER PRIMARY KEY,
37 last_uploaded TIMESTAMP,
38 last_checked TIMESTAMP
45 CREATE TABLE directories -- added in v2
47 dirhash varchar(256) PRIMARY KEY, -- base32(dirhash)
48 dircap varchar(256), -- URI:DIR2-CHK:...
49 last_uploaded TIMESTAMP,
50 last_checked TIMESTAMP
55 SCHEMA_v2 = SCHEMA_v1 + TABLE_DIRECTORY
57 UPDATE_v1_to_v2 = TABLE_DIRECTORY + """
58 UPDATE version SET version=2;
62 def get_backupdb(dbfile, stderr=sys.stderr,
63 create_version=(SCHEMA_v2, 2), just_create=False):
64 # open or create the given backupdb file. The parent directory must
68 must_create = not os.path.exists(dbfile)
70 db = sqlite3.connect(dbfile)
71 except (EnvironmentError, sqlite3.OperationalError), e:
72 print >>stderr, "Unable to create/open backupdb file %s: %s" % (dbfile, e)
77 schema, version = create_version
78 c.executescript(schema)
79 c.execute("INSERT INTO version (version) VALUES (?)", (version,))
83 c.execute("SELECT version FROM version")
84 version = c.fetchone()[0]
85 except sqlite3.DatabaseError, e:
86 # this indicates that the file is not a compatible database format.
87 # Perhaps it was created with an old version, or it might be junk.
88 print >>stderr, "backupdb file is unusable: %s" % e
91 if just_create: # for tests
95 c.executescript(UPDATE_v1_to_v2)
99 return BackupDB_v2(sqlite3, db)
100 print >>stderr, "Unable to handle backupdb version %s" % version
104 def __init__(self, bdb, filecap, should_check,
105 path, mtime, ctime, size):
107 self.filecap = filecap
108 self.should_check_p = should_check
115 def was_uploaded(self):
120 def did_upload(self, filecap):
121 self.bdb.did_upload_file(filecap, self.path,
122 self.mtime, self.ctime, self.size)
124 def should_check(self):
125 return self.should_check_p
127 def did_check_healthy(self, results):
128 self.bdb.did_check_file_healthy(self.filecap, results)
130 class DirectoryResult:
131 def __init__(self, bdb, dirhash, dircap, should_check):
134 self.should_check_p = should_check
135 self.dirhash = dirhash
137 def was_created(self):
142 def did_create(self, dircap):
143 self.bdb.did_create_directory(dircap, self.dirhash)
145 def should_check(self):
146 return self.should_check_p
148 def did_check_healthy(self, results):
149 self.bdb.did_check_directory_healthy(self.dircap, results)
153 NO_CHECK_BEFORE = 1*MONTH
154 ALWAYS_CHECK_AFTER = 2*MONTH
156 def __init__(self, sqlite_module, connection):
157 self.sqlite_module = sqlite_module
158 self.connection = connection
159 self.cursor = connection.cursor()
161 def check_file(self, path, use_timestamps=True):
162 """I will tell you if a given local file needs to be uploaded or not,
163 by looking in a database and seeing if I have a record of this file
164 having been uploaded earlier.
166 I return a FileResults object, synchronously. If r.was_uploaded()
167 returns False, you should upload the file. When you are finished
168 uploading it, call r.did_upload(filecap), so I can update my
171 If was_uploaded() returns a filecap, you might be able to avoid an
172 upload. Call r.should_check(), and if it says False, you can skip the
173 upload and use the filecap returned by was_uploaded().
175 If should_check() returns True, you should perform a filecheck on the
176 filecap returned by was_uploaded(). If the check indicates the file
177 is healthy, please call r.did_check_healthy(checker_results) so I can
178 update the database, using the de-JSONized response from the webapi
179 t=check call for 'checker_results'. If the check indicates the file
180 is not healthy, please upload the file and call r.did_upload(filecap)
183 I use_timestamps=True (the default), I will compare ctime and mtime
184 of the local file against an entry in my database, and consider the
185 file to be unchanged if ctime, mtime, and filesize are all the same
186 as the earlier version. If use_timestamps=False, I will not trust the
187 timestamps, so more files (perhaps all) will be marked as needing
188 upload. A future version of this database may hash the file to make
189 equality decisions, in which case use_timestamps=False will not
190 always imply r.must_upload()==True.
192 'path' points to a local file on disk, possibly relative to the
193 current working directory. The database stores absolute pathnames.
196 path = abspath_expanduser_unicode(path)
198 size = s[stat.ST_SIZE]
199 ctime = s[stat.ST_CTIME]
200 mtime = s[stat.ST_MTIME]
205 c.execute("SELECT size,mtime,ctime,fileid"
209 row = self.cursor.fetchone()
211 return FileResult(self, None, False, path, mtime, ctime, size)
212 (last_size,last_mtime,last_ctime,last_fileid) = row
214 c.execute("SELECT caps.filecap, last_upload.last_checked"
215 " FROM caps,last_upload"
216 " WHERE caps.fileid=? AND last_upload.fileid=?",
217 (last_fileid, last_fileid))
220 if ((last_size != size
221 or not use_timestamps
222 or last_mtime != mtime
223 or last_ctime != ctime) # the file has been changed
224 or (not row2) # we somehow forgot where we put the file last time
226 c.execute("DELETE FROM local_files WHERE path=?", (path,))
227 self.connection.commit()
228 return FileResult(self, None, False, path, mtime, ctime, size)
230 # at this point, we're allowed to assume the file hasn't been changed
231 (filecap, last_checked) = row2
232 age = now - last_checked
234 probability = ((age - self.NO_CHECK_BEFORE) /
235 (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
236 probability = min(max(probability, 0.0), 1.0)
237 should_check = bool(random.random() < probability)
239 return FileResult(self, to_str(filecap), should_check,
240 path, mtime, ctime, size)
242 def get_or_allocate_fileid_for_cap(self, filecap):
243 # find an existing fileid for this filecap, or insert a new one. The
244 # caller is required to commit() afterwards.
246 # mysql has "INSERT ... ON DUPLICATE KEY UPDATE", but not sqlite
247 # sqlite has "INSERT ON CONFLICT REPLACE", but not mysql
248 # So we use INSERT, ignore any error, then a SELECT
251 c.execute("INSERT INTO caps (filecap) VALUES (?)", (filecap,))
252 except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
253 # sqlite3 on sid gives IntegrityError
254 # pysqlite2 (which we don't use, so maybe no longer relevant) on dapper gives OperationalError
256 c.execute("SELECT fileid FROM caps WHERE filecap=?", (filecap,))
257 foundrow = c.fetchone()
262 def did_upload_file(self, filecap, path, mtime, ctime, size):
264 fileid = self.get_or_allocate_fileid_for_cap(filecap)
266 self.cursor.execute("INSERT INTO last_upload VALUES (?,?,?)",
268 except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
269 self.cursor.execute("UPDATE last_upload"
270 " SET last_uploaded=?, last_checked=?"
274 self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?)",
275 (path, size, mtime, ctime, fileid))
276 except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
277 self.cursor.execute("UPDATE local_files"
278 " SET size=?, mtime=?, ctime=?, fileid=?"
280 (size, mtime, ctime, fileid, path))
281 self.connection.commit()
283 def did_check_file_healthy(self, filecap, results):
285 fileid = self.get_or_allocate_fileid_for_cap(filecap)
286 self.cursor.execute("UPDATE last_upload"
287 " SET last_checked=?"
290 self.connection.commit()
292 def check_directory(self, contents):
293 """I will tell you if a new directory needs to be created for a given
294 set of directory contents, or if I know of an existing (immutable)
295 directory that can be used instead.
297 'contents' should be a dictionary that maps from child name (a single
298 unicode string) to immutable childcap (filecap or dircap).
300 I return a DirectoryResult object, synchronously. If r.was_created()
301 returns False, you should create the directory (with
302 t=mkdir-immutable). When you are finished, call r.did_create(dircap)
303 so I can update my database.
305 If was_created() returns a dircap, you might be able to avoid the
306 mkdir. Call r.should_check(), and if it says False, you can skip the
307 mkdir and use the dircap returned by was_created().
309 If should_check() returns True, you should perform a check operation
310 on the dircap returned by was_created(). If the check indicates the
311 directory is healthy, please call
312 r.did_check_healthy(checker_results) so I can update the database,
313 using the de-JSONized response from the webapi t=check call for
314 'checker_results'. If the check indicates the directory is not
315 healthy, please repair or re-create the directory and call
316 r.did_create(dircap) when you're done.
321 for name in contents:
322 entries.append( [name.encode("utf-8"), contents[name]] )
324 data = "".join([netstring(name_utf8)+netstring(cap)
325 for (name_utf8,cap) in entries])
326 dirhash = backupdb_dirhash(data)
327 dirhash_s = base32.b2a(dirhash)
329 c.execute("SELECT dircap, last_checked"
330 " FROM directories WHERE dirhash=?", (dirhash_s,))
333 return DirectoryResult(self, dirhash_s, None, False)
334 (dircap, last_checked) = row
335 age = now - last_checked
337 probability = ((age - self.NO_CHECK_BEFORE) /
338 (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
339 probability = min(max(probability, 0.0), 1.0)
340 should_check = bool(random.random() < probability)
342 return DirectoryResult(self, dirhash_s, to_str(dircap), should_check)
344 def did_create_directory(self, dircap, dirhash):
346 # if the dirhash is already present (i.e. we've re-uploaded an
347 # existing directory, possibly replacing the dircap with a new one),
348 # update the record in place. Otherwise create a new record.)
349 self.cursor.execute("REPLACE INTO directories VALUES (?,?,?,?)",
350 (dirhash, dircap, now, now))
351 self.connection.commit()
353 def did_check_directory_healthy(self, dircap, results):
355 self.cursor.execute("UPDATE directories"
356 " SET last_checked=?"
359 self.connection.commit()