]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/backupdb.py
WIP
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / backupdb.py
1
2 import os.path, sys, time, random, stat
3
4 from allmydata.util.netstring import netstring
5 from allmydata.util.hashutil import backupdb_dirhash
6 from allmydata.util import base32
7 from allmydata.util.fileutil import abspath_expanduser_unicode
8 from allmydata.util.encodingutil import to_str
9 from allmydata.util.dbutil import get_db, DBError
10
11
12 DAY = 24*60*60
13 MONTH = 30*DAY
14
15 MAIN_SCHEMA = """
16 CREATE TABLE version
17 (
18  version INTEGER  -- contains one row, set to %s
19 );
20
21 CREATE TABLE local_files
22 (
23  path  VARCHAR(1024) PRIMARY KEY, -- index, this is an absolute UTF-8-encoded local filename
24  -- note that size is before mtime and ctime here, but after in function parameters
25  size  INTEGER,       -- os.stat(fn)[stat.ST_SIZE]   (NULL if the file has been deleted)
26  mtime NUMBER,        -- os.stat(fn)[stat.ST_MTIME]
27  ctime NUMBER,        -- os.stat(fn)[stat.ST_CTIME]
28  fileid INTEGER%s
29 );
30
31 CREATE TABLE caps
32 (
33  fileid INTEGER PRIMARY KEY AUTOINCREMENT,
34  filecap VARCHAR(256) UNIQUE       -- URI:CHK:...
35 );
36
37 CREATE TABLE last_upload
38 (
39  fileid INTEGER PRIMARY KEY,
40  last_uploaded TIMESTAMP,
41  last_checked TIMESTAMP
42 );
43
44 """
45
46 SCHEMA_v1 = MAIN_SCHEMA % (1, "")
47
48 TABLE_DIRECTORY = """
49
50 CREATE TABLE directories -- added in v2
51 (
52  dirhash varchar(256) PRIMARY KEY,  -- base32(dirhash)
53  dircap varchar(256),               -- URI:DIR2-CHK:...
54  last_uploaded TIMESTAMP,
55  last_checked TIMESTAMP
56 );
57
58 """
59
60 SCHEMA_v2 = MAIN_SCHEMA % (2, "") + TABLE_DIRECTORY
61
62 UPDATE_v1_to_v2 = TABLE_DIRECTORY + """
63 UPDATE version SET version=2;
64 """
65
66 UPDATERS = {
67     2: UPDATE_v1_to_v2,
68 }
69
70
71 # magic-folder db schema version 3
72 MAGIC_FOLDER_SCHEMA_v3 = """
73 CREATE TABLE version
74 (
75  version INTEGER  -- contains one row, set to %s
76 );
77
78 CREATE TABLE local_files
79 (
80  path  VARCHAR(1024) PRIMARY KEY, -- index, this is an absolute UTF-8-encoded local filename
81  -- note that size is before mtime and ctime here, but after in function parameters
82  size  INTEGER,       -- os.stat(fn)[stat.ST_SIZE]   (NULL if the file has been deleted)
83  mtime NUMBER,        -- os.stat(fn)[stat.ST_MTIME]
84  ctime NUMBER,        -- os.stat(fn)[stat.ST_CTIME]
85  version INTEGER,
86  last_downloaded_uri VARCHAR(256) UNIQUE,       -- URI:CHK:... 
87  last_downloaded_timestamp NUMBER
88 );
89 """ % (3,)
90
91
92 def get_backupdb(dbfile, stderr=sys.stderr,
93                  create_version=(SCHEMA_v2, 2), just_create=False):
94     # Open or create the given backupdb file. The parent directory must
95     # exist.
96     try:
97         (sqlite3, db) = get_db(dbfile, stderr, create_version, updaters=UPDATERS,
98                                just_create=just_create, dbname="backupdb")
99         if create_version[1] in (1, 2):
100             return BackupDB(sqlite3, db)
101         elif create_version[1] == 3:
102             return MagicFolderDB(sqlite3, db)
103         else:
104             print >>stderr, "invalid db schema version specified"
105             return None
106     except DBError, e:
107         print >>stderr, e
108         return None
109
110
111 class FileResult:
112     def __init__(self, bdb, filecap, should_check,
113                  path, mtime, ctime, size):
114         self.bdb = bdb
115         self.filecap = filecap
116         self.should_check_p = should_check
117
118         self.path = path
119         self.mtime = mtime
120         self.ctime = ctime
121         self.size = size
122
123     def was_uploaded(self):
124         if self.filecap:
125             return self.filecap
126         return False
127
128     def did_upload(self, filecap):
129         self.bdb.did_upload_file(filecap, self.path,
130                                  self.mtime, self.ctime, self.size)
131
132     def should_check(self):
133         return self.should_check_p
134
135     def did_check_healthy(self, results):
136         self.bdb.did_check_file_healthy(self.filecap, results)
137
138
139 class DirectoryResult:
140     def __init__(self, bdb, dirhash, dircap, should_check):
141         self.bdb = bdb
142         self.dircap = dircap
143         self.should_check_p = should_check
144         self.dirhash = dirhash
145
146     def was_created(self):
147         if self.dircap:
148             return self.dircap
149         return False
150
151     def did_create(self, dircap):
152         self.bdb.did_create_directory(dircap, self.dirhash)
153
154     def should_check(self):
155         return self.should_check_p
156
157     def did_check_healthy(self, results):
158         self.bdb.did_check_directory_healthy(self.dircap, results)
159
160
161 class BackupDB:
162     VERSION = 2
163     NO_CHECK_BEFORE = 1*MONTH
164     ALWAYS_CHECK_AFTER = 2*MONTH
165
166     def __init__(self, sqlite_module, connection):
167         self.sqlite_module = sqlite_module
168         self.connection = connection
169         self.cursor = connection.cursor()
170
171     def check_file_db_exists(self, path):
172         """I will tell you if a given file has an entry in my database or not
173         by returning True or False.
174         """
175         c = self.cursor
176         c.execute("SELECT size,mtime,ctime,fileid"
177                   " FROM local_files"
178                   " WHERE path=?",
179                   (path,))
180         row = self.cursor.fetchone()
181         if not row:
182             return False
183         else:
184             return True
185
186     def check_file(self, path, use_timestamps=True):
187         """I will tell you if a given local file needs to be uploaded or not,
188         by looking in a database and seeing if I have a record of this file
189         having been uploaded earlier.
190
191         I return a FileResults object, synchronously. If r.was_uploaded()
192         returns False, you should upload the file. When you are finished
193         uploading it, call r.did_upload(filecap), so I can update my
194         database.
195
196         If was_uploaded() returns a filecap, you might be able to avoid an
197         upload. Call r.should_check(), and if it says False, you can skip the
198         upload and use the filecap returned by was_uploaded().
199
200         If should_check() returns True, you should perform a filecheck on the
201         filecap returned by was_uploaded(). If the check indicates the file
202         is healthy, please call r.did_check_healthy(checker_results) so I can
203         update the database, using the de-JSONized response from the webapi
204         t=check call for 'checker_results'. If the check indicates the file
205         is not healthy, please upload the file and call r.did_upload(filecap)
206         when you're done.
207
208         If use_timestamps=True (the default), I will compare mtime and ctime
209         of the local file against an entry in my database, and consider the
210         file to be unchanged if mtime, ctime, and filesize are all the same
211         as the earlier version. If use_timestamps=False, I will not trust the
212         timestamps, so more files (perhaps all) will be marked as needing
213         upload. A future version of this database may hash the file to make
214         equality decisions, in which case use_timestamps=False will not
215         always imply r.must_upload()==True.
216
217         'path' points to a local file on disk, possibly relative to the
218         current working directory. The database stores absolute pathnames.
219         """
220
221         path = abspath_expanduser_unicode(path)
222
223         # XXX consider using get_pathinfo
224         s = os.stat(path)
225         size = s[stat.ST_SIZE]
226         mtime = s[stat.ST_MTIME]
227         ctime = s[stat.ST_CTIME]
228
229         now = time.time()
230         c = self.cursor
231
232         c.execute("SELECT size,mtime,ctime,fileid"
233                   " FROM local_files"
234                   " WHERE path=?",
235                   (path,))
236         row = self.cursor.fetchone()
237         if not row:
238             return FileResult(self, None, False, path, mtime, ctime, size)
239         (last_size,last_mtime,last_ctime,last_fileid) = row
240
241         c.execute("SELECT caps.filecap, last_upload.last_checked"
242                   " FROM caps,last_upload"
243                   " WHERE caps.fileid=? AND last_upload.fileid=?",
244                   (last_fileid, last_fileid))
245         row2 = c.fetchone()
246
247         if ((last_size != size
248              or not use_timestamps
249              or last_mtime != mtime
250              or last_ctime != ctime) # the file has been changed
251             or (not row2) # we somehow forgot where we put the file last time
252             ):
253             c.execute("DELETE FROM local_files WHERE path=?", (path,))
254             self.connection.commit()
255             return FileResult(self, None, False, path, mtime, ctime, size)
256
257         # at this point, we're allowed to assume the file hasn't been changed
258         (filecap, last_checked) = row2
259         age = now - last_checked
260
261         probability = ((age - self.NO_CHECK_BEFORE) /
262                        (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
263         probability = min(max(probability, 0.0), 1.0)
264         should_check = bool(random.random() < probability)
265
266         return FileResult(self, to_str(filecap), should_check,
267                           path, mtime, ctime, size)
268
269     def get_or_allocate_fileid_for_cap(self, filecap):
270         # find an existing fileid for this filecap, or insert a new one. The
271         # caller is required to commit() afterwards.
272
273         # mysql has "INSERT ... ON DUPLICATE KEY UPDATE", but not sqlite
274         # sqlite has "INSERT ON CONFLICT REPLACE", but not mysql
275         # So we use INSERT, ignore any error, then a SELECT
276         c = self.cursor
277         try:
278             c.execute("INSERT INTO caps (filecap) VALUES (?)", (filecap,))
279         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
280             # sqlite3 on sid gives IntegrityError
281             # pysqlite2 (which we don't use, so maybe no longer relevant) on dapper gives OperationalError
282             pass
283         c.execute("SELECT fileid FROM caps WHERE filecap=?", (filecap,))
284         foundrow = c.fetchone()
285         assert foundrow
286         fileid = foundrow[0]
287         return fileid
288
289     def did_upload_file(self, filecap, path, mtime, ctime, size):
290         now = time.time()
291         fileid = self.get_or_allocate_fileid_for_cap(filecap)
292         try:
293             self.cursor.execute("INSERT INTO last_upload VALUES (?,?,?)",
294                                 (fileid, now, now))
295         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
296             self.cursor.execute("UPDATE last_upload"
297                                 " SET last_uploaded=?, last_checked=?"
298                                 " WHERE fileid=?",
299                                 (now, now, fileid))
300         try:
301             self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?)",
302                                 (path, size, mtime, ctime, fileid))
303         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
304             self.cursor.execute("UPDATE local_files"
305                                 " SET size=?, mtime=?, ctime=?, fileid=?"
306                                 " WHERE path=?",
307                                 (size, mtime, ctime, fileid, path))
308         self.connection.commit()
309
310     def did_check_file_healthy(self, filecap, results):
311         now = time.time()
312         fileid = self.get_or_allocate_fileid_for_cap(filecap)
313         self.cursor.execute("UPDATE last_upload"
314                             " SET last_checked=?"
315                             " WHERE fileid=?",
316                             (now, fileid))
317         self.connection.commit()
318
319     def check_directory(self, contents):
320         """I will tell you if a new directory needs to be created for a given
321         set of directory contents, or if I know of an existing (immutable)
322         directory that can be used instead.
323
324         'contents' should be a dictionary that maps from child name (a single
325         unicode string) to immutable childcap (filecap or dircap).
326
327         I return a DirectoryResult object, synchronously. If r.was_created()
328         returns False, you should create the directory (with
329         t=mkdir-immutable). When you are finished, call r.did_create(dircap)
330         so I can update my database.
331
332         If was_created() returns a dircap, you might be able to avoid the
333         mkdir. Call r.should_check(), and if it says False, you can skip the
334         mkdir and use the dircap returned by was_created().
335
336         If should_check() returns True, you should perform a check operation
337         on the dircap returned by was_created(). If the check indicates the
338         directory is healthy, please call
339         r.did_check_healthy(checker_results) so I can update the database,
340         using the de-JSONized response from the webapi t=check call for
341         'checker_results'. If the check indicates the directory is not
342         healthy, please repair or re-create the directory and call
343         r.did_create(dircap) when you're done.
344         """
345
346         now = time.time()
347         entries = []
348         for name in contents:
349             entries.append( [name.encode("utf-8"), contents[name]] )
350         entries.sort()
351         data = "".join([netstring(name_utf8)+netstring(cap)
352                         for (name_utf8,cap) in entries])
353         dirhash = backupdb_dirhash(data)
354         dirhash_s = base32.b2a(dirhash)
355         c = self.cursor
356         c.execute("SELECT dircap, last_checked"
357                   " FROM directories WHERE dirhash=?", (dirhash_s,))
358         row = c.fetchone()
359         if not row:
360             return DirectoryResult(self, dirhash_s, None, False)
361         (dircap, last_checked) = row
362         age = now - last_checked
363
364         probability = ((age - self.NO_CHECK_BEFORE) /
365                        (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
366         probability = min(max(probability, 0.0), 1.0)
367         should_check = bool(random.random() < probability)
368
369         return DirectoryResult(self, dirhash_s, to_str(dircap), should_check)
370
371     def did_create_directory(self, dircap, dirhash):
372         now = time.time()
373         # if the dirhash is already present (i.e. we've re-uploaded an
374         # existing directory, possibly replacing the dircap with a new one),
375         # update the record in place. Otherwise create a new record.)
376         self.cursor.execute("REPLACE INTO directories VALUES (?,?,?,?)",
377                             (dirhash, dircap, now, now))
378         self.connection.commit()
379
380     def did_check_directory_healthy(self, dircap, results):
381         now = time.time()
382         self.cursor.execute("UPDATE directories"
383                             " SET last_checked=?"
384                             " WHERE dircap=?",
385                             (now, dircap))
386         self.connection.commit()
387
388
389 class MagicFolderDB():
390     VERSION = 3
391
392     def __init__(self, sqlite_module, connection):
393         self.sqlite_module = sqlite_module
394         self.connection = connection
395         self.cursor = connection.cursor()
396
397     def check_file_db_exists(self, path):
398         """I will tell you if a given file has an entry in my database or not
399         by returning True or False.
400         """
401         c = self.cursor
402         c.execute("SELECT size,mtime,ctime"
403                   " FROM local_files"
404                   " WHERE path=?",
405                   (path,))
406         row = self.cursor.fetchone()
407         if not row:
408             return False
409         else:
410             return True
411
412     def get_all_relpaths(self):
413         """
414         Retrieve a set of all relpaths of files that have had an entry in magic folder db
415         (i.e. that have been downloaded at least once).
416         """
417         self.cursor.execute("SELECT path FROM local_files")
418         rows = self.cursor.fetchall()
419         return set([r[0] for r in rows])
420
421     def get_last_downloaded_uri(self, relpath_u):
422         """
423         Return the last downloaded uri recorded in the magic folder db.
424         If none are found then return None.
425         """
426         c = self.cursor
427         c.execute("SELECT last_downloaded_uri"
428                   " FROM local_files"
429                   " WHERE path=?",
430                   (relpath_u,))
431         row = self.cursor.fetchone()
432         if not row:
433             return None
434         else:
435             return row[0]
436
437     def get_local_file_version(self, relpath_u):
438         """
439         Return the version of a local file tracked by our magic folder db.
440         If no db entry is found then return None.
441         """
442         c = self.cursor
443         c.execute("SELECT version"
444                   " FROM local_files"
445                   " WHERE path=?",
446                   (relpath_u,))
447         row = self.cursor.fetchone()
448         if not row:
449             return None
450         else:
451             return row[0]
452
453     def did_upload_version(self, filecap, relpath_u, version, pathinfo):
454         print "did_upload_version(%r, %r, %r, %r)" % (filecap, relpath_u, version, pathinfo)
455         now = time.time()
456         try:
457             print "insert"
458             self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?,?)",
459                                 (relpath_u, pathinfo.size, pathinfo.mtime, pathinfo.ctime, version, filecap, pathinfo.mtime))
460         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
461             print "err... update"
462             try:
463                 self.cursor.execute("UPDATE local_files"
464                                     " SET size=?, mtime=?, ctime=?, version=?, last_downloaded_uri=?, last_downloaded_timestamp=?"
465                                     " WHERE path=?",
466                                     (pathinfo.size, pathinfo.mtime, pathinfo.ctime, version, filecap, pathinfo.mtime, relpath_u))
467             except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
468                 print "WTF BBQ OMG"
469         self.connection.commit()
470         print "commited"
471
472     def is_new_file(self, pathinfo, relpath_u):
473         """
474         Returns true if the file's current pathinfo (size, mtime, and ctime) has
475         changed from the pathinfo previously stored in the db.
476         """
477         #print "is_new_file(%r, %r)" % (pathinfo, relpath_u)
478         c = self.cursor
479         c.execute("SELECT size, mtime, ctime"
480                   " FROM local_files"
481                   " WHERE path=?",
482                   (relpath_u,))
483         row = self.cursor.fetchone()
484         if not row:
485             return True
486         return (pathinfo.size, pathinfo.mtime, pathinfo.ctime) != row