]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/backupdb.py
Fix a subtle bug in the overwrite algorithm.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / backupdb.py
1
2 import os.path, sys, time, random, stat
3
4 from allmydata.util.netstring import netstring
5 from allmydata.util.hashutil import backupdb_dirhash
6 from allmydata.util import base32
7 from allmydata.util.fileutil import abspath_expanduser_unicode
8 from allmydata.util.encodingutil import to_str
9 from allmydata.util.dbutil import get_db, DBError
10
11
12 DAY = 24*60*60
13 MONTH = 30*DAY
14
15 MAIN_SCHEMA = """
16 CREATE TABLE version
17 (
18  version INTEGER  -- contains one row, set to %s
19 );
20
21 CREATE TABLE local_files
22 (
23  path  VARCHAR(1024) PRIMARY KEY, -- index, this is an absolute UTF-8-encoded local filename
24  -- note that size is before mtime and ctime here, but after in function parameters
25  size  INTEGER,       -- os.stat(fn)[stat.ST_SIZE]   (NULL if the file has been deleted)
26  mtime NUMBER,        -- os.stat(fn)[stat.ST_MTIME]
27  ctime NUMBER,        -- os.stat(fn)[stat.ST_CTIME]
28  fileid INTEGER%s
29 );
30
31 CREATE TABLE caps
32 (
33  fileid INTEGER PRIMARY KEY AUTOINCREMENT,
34  filecap VARCHAR(256) UNIQUE       -- URI:CHK:...
35 );
36
37 CREATE TABLE last_upload
38 (
39  fileid INTEGER PRIMARY KEY,
40  last_uploaded TIMESTAMP,
41  last_checked TIMESTAMP
42 );
43
44 """
45
46 SCHEMA_v1 = MAIN_SCHEMA % (1, "")
47
48 TABLE_DIRECTORY = """
49
50 CREATE TABLE directories -- added in v2
51 (
52  dirhash varchar(256) PRIMARY KEY,  -- base32(dirhash)
53  dircap varchar(256),               -- URI:DIR2-CHK:...
54  last_uploaded TIMESTAMP,
55  last_checked TIMESTAMP
56 );
57
58 """
59
60 SCHEMA_v2 = MAIN_SCHEMA % (2, "") + TABLE_DIRECTORY
61
62 UPDATE_v1_to_v2 = TABLE_DIRECTORY + """
63 UPDATE version SET version=2;
64 """
65
66 UPDATERS = {
67     2: UPDATE_v1_to_v2,
68 }
69
70
71 def get_backupdb(dbfile, stderr=sys.stderr,
72                  create_version=(SCHEMA_v2, 2), just_create=False):
73     # Open or create the given backupdb file. The parent directory must
74     # exist.
75     try:
76         (sqlite3, db) = get_db(dbfile, stderr, create_version, updaters=UPDATERS,
77                                just_create=just_create, dbname="backupdb")
78         if create_version[1] in (1, 2):
79             return BackupDB(sqlite3, db)
80         else:
81             print >>stderr, "invalid db schema version specified"
82             return None
83     except DBError, e:
84         print >>stderr, e
85         return None
86
87
88 class FileResult:
89     def __init__(self, bdb, filecap, should_check,
90                  path, mtime, ctime, size):
91         self.bdb = bdb
92         self.filecap = filecap
93         self.should_check_p = should_check
94
95         self.path = path
96         self.mtime = mtime
97         self.ctime = ctime
98         self.size = size
99
100     def was_uploaded(self):
101         if self.filecap:
102             return self.filecap
103         return False
104
105     def did_upload(self, filecap):
106         self.bdb.did_upload_file(filecap, self.path,
107                                  self.mtime, self.ctime, self.size)
108
109     def should_check(self):
110         return self.should_check_p
111
112     def did_check_healthy(self, results):
113         self.bdb.did_check_file_healthy(self.filecap, results)
114
115
116 class DirectoryResult:
117     def __init__(self, bdb, dirhash, dircap, should_check):
118         self.bdb = bdb
119         self.dircap = dircap
120         self.should_check_p = should_check
121         self.dirhash = dirhash
122
123     def was_created(self):
124         if self.dircap:
125             return self.dircap
126         return False
127
128     def did_create(self, dircap):
129         self.bdb.did_create_directory(dircap, self.dirhash)
130
131     def should_check(self):
132         return self.should_check_p
133
134     def did_check_healthy(self, results):
135         self.bdb.did_check_directory_healthy(self.dircap, results)
136
137
138 class BackupDB:
139     VERSION = 2
140     NO_CHECK_BEFORE = 1*MONTH
141     ALWAYS_CHECK_AFTER = 2*MONTH
142
143     def __init__(self, sqlite_module, connection):
144         self.sqlite_module = sqlite_module
145         self.connection = connection
146         self.cursor = connection.cursor()
147
148     def check_file_db_exists(self, path):
149         """I will tell you if a given file has an entry in my database or not
150         by returning True or False.
151         """
152         c = self.cursor
153         c.execute("SELECT size,mtime,ctime,fileid"
154                   " FROM local_files"
155                   " WHERE path=?",
156                   (path,))
157         row = self.cursor.fetchone()
158         if not row:
159             return False
160         else:
161             return True
162
163     def check_file(self, path, use_timestamps=True):
164         """I will tell you if a given local file needs to be uploaded or not,
165         by looking in a database and seeing if I have a record of this file
166         having been uploaded earlier.
167
168         I return a FileResults object, synchronously. If r.was_uploaded()
169         returns False, you should upload the file. When you are finished
170         uploading it, call r.did_upload(filecap), so I can update my
171         database.
172
173         If was_uploaded() returns a filecap, you might be able to avoid an
174         upload. Call r.should_check(), and if it says False, you can skip the
175         upload and use the filecap returned by was_uploaded().
176
177         If should_check() returns True, you should perform a filecheck on the
178         filecap returned by was_uploaded(). If the check indicates the file
179         is healthy, please call r.did_check_healthy(checker_results) so I can
180         update the database, using the de-JSONized response from the webapi
181         t=check call for 'checker_results'. If the check indicates the file
182         is not healthy, please upload the file and call r.did_upload(filecap)
183         when you're done.
184
185         If use_timestamps=True (the default), I will compare mtime and ctime
186         of the local file against an entry in my database, and consider the
187         file to be unchanged if mtime, ctime, and filesize are all the same
188         as the earlier version. If use_timestamps=False, I will not trust the
189         timestamps, so more files (perhaps all) will be marked as needing
190         upload. A future version of this database may hash the file to make
191         equality decisions, in which case use_timestamps=False will not
192         always imply r.must_upload()==True.
193
194         'path' points to a local file on disk, possibly relative to the
195         current working directory. The database stores absolute pathnames.
196         """
197
198         path = abspath_expanduser_unicode(path)
199
200         # XXX consider using get_pathinfo
201         s = os.stat(path)
202         size = s[stat.ST_SIZE]
203         mtime = s[stat.ST_MTIME]
204         ctime = s[stat.ST_CTIME]
205
206         now = time.time()
207         c = self.cursor
208
209         c.execute("SELECT size,mtime,ctime,fileid"
210                   " FROM local_files"
211                   " WHERE path=?",
212                   (path,))
213         row = self.cursor.fetchone()
214         if not row:
215             return FileResult(self, None, False, path, mtime, ctime, size)
216         (last_size,last_mtime,last_ctime,last_fileid) = row
217
218         c.execute("SELECT caps.filecap, last_upload.last_checked"
219                   " FROM caps,last_upload"
220                   " WHERE caps.fileid=? AND last_upload.fileid=?",
221                   (last_fileid, last_fileid))
222         row2 = c.fetchone()
223
224         if ((last_size != size
225              or not use_timestamps
226              or last_mtime != mtime
227              or last_ctime != ctime) # the file has been changed
228             or (not row2) # we somehow forgot where we put the file last time
229             ):
230             c.execute("DELETE FROM local_files WHERE path=?", (path,))
231             self.connection.commit()
232             return FileResult(self, None, False, path, mtime, ctime, size)
233
234         # at this point, we're allowed to assume the file hasn't been changed
235         (filecap, last_checked) = row2
236         age = now - last_checked
237
238         probability = ((age - self.NO_CHECK_BEFORE) /
239                        (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
240         probability = min(max(probability, 0.0), 1.0)
241         should_check = bool(random.random() < probability)
242
243         return FileResult(self, to_str(filecap), should_check,
244                           path, mtime, ctime, size)
245
246     def get_or_allocate_fileid_for_cap(self, filecap):
247         # find an existing fileid for this filecap, or insert a new one. The
248         # caller is required to commit() afterwards.
249
250         # mysql has "INSERT ... ON DUPLICATE KEY UPDATE", but not sqlite
251         # sqlite has "INSERT ON CONFLICT REPLACE", but not mysql
252         # So we use INSERT, ignore any error, then a SELECT
253         c = self.cursor
254         try:
255             c.execute("INSERT INTO caps (filecap) VALUES (?)", (filecap,))
256         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
257             # sqlite3 on sid gives IntegrityError
258             # pysqlite2 (which we don't use, so maybe no longer relevant) on dapper gives OperationalError
259             pass
260         c.execute("SELECT fileid FROM caps WHERE filecap=?", (filecap,))
261         foundrow = c.fetchone()
262         assert foundrow
263         fileid = foundrow[0]
264         return fileid
265
266     def did_upload_file(self, filecap, path, mtime, ctime, size):
267         now = time.time()
268         fileid = self.get_or_allocate_fileid_for_cap(filecap)
269         try:
270             self.cursor.execute("INSERT INTO last_upload VALUES (?,?,?)",
271                                 (fileid, now, now))
272         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
273             self.cursor.execute("UPDATE last_upload"
274                                 " SET last_uploaded=?, last_checked=?"
275                                 " WHERE fileid=?",
276                                 (now, now, fileid))
277         try:
278             self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?)",
279                                 (path, size, mtime, ctime, fileid))
280         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
281             self.cursor.execute("UPDATE local_files"
282                                 " SET size=?, mtime=?, ctime=?, fileid=?"
283                                 " WHERE path=?",
284                                 (size, mtime, ctime, fileid, path))
285         self.connection.commit()
286
287     def did_check_file_healthy(self, filecap, results):
288         now = time.time()
289         fileid = self.get_or_allocate_fileid_for_cap(filecap)
290         self.cursor.execute("UPDATE last_upload"
291                             " SET last_checked=?"
292                             " WHERE fileid=?",
293                             (now, fileid))
294         self.connection.commit()
295
296     def check_directory(self, contents):
297         """I will tell you if a new directory needs to be created for a given
298         set of directory contents, or if I know of an existing (immutable)
299         directory that can be used instead.
300
301         'contents' should be a dictionary that maps from child name (a single
302         unicode string) to immutable childcap (filecap or dircap).
303
304         I return a DirectoryResult object, synchronously. If r.was_created()
305         returns False, you should create the directory (with
306         t=mkdir-immutable). When you are finished, call r.did_create(dircap)
307         so I can update my database.
308
309         If was_created() returns a dircap, you might be able to avoid the
310         mkdir. Call r.should_check(), and if it says False, you can skip the
311         mkdir and use the dircap returned by was_created().
312
313         If should_check() returns True, you should perform a check operation
314         on the dircap returned by was_created(). If the check indicates the
315         directory is healthy, please call
316         r.did_check_healthy(checker_results) so I can update the database,
317         using the de-JSONized response from the webapi t=check call for
318         'checker_results'. If the check indicates the directory is not
319         healthy, please repair or re-create the directory and call
320         r.did_create(dircap) when you're done.
321         """
322
323         now = time.time()
324         entries = []
325         for name in contents:
326             entries.append( [name.encode("utf-8"), contents[name]] )
327         entries.sort()
328         data = "".join([netstring(name_utf8)+netstring(cap)
329                         for (name_utf8,cap) in entries])
330         dirhash = backupdb_dirhash(data)
331         dirhash_s = base32.b2a(dirhash)
332         c = self.cursor
333         c.execute("SELECT dircap, last_checked"
334                   " FROM directories WHERE dirhash=?", (dirhash_s,))
335         row = c.fetchone()
336         if not row:
337             return DirectoryResult(self, dirhash_s, None, False)
338         (dircap, last_checked) = row
339         age = now - last_checked
340
341         probability = ((age - self.NO_CHECK_BEFORE) /
342                        (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
343         probability = min(max(probability, 0.0), 1.0)
344         should_check = bool(random.random() < probability)
345
346         return DirectoryResult(self, dirhash_s, to_str(dircap), should_check)
347
348     def did_create_directory(self, dircap, dirhash):
349         now = time.time()
350         # if the dirhash is already present (i.e. we've re-uploaded an
351         # existing directory, possibly replacing the dircap with a new one),
352         # update the record in place. Otherwise create a new record.)
353         self.cursor.execute("REPLACE INTO directories VALUES (?,?,?,?)",
354                             (dirhash, dircap, now, now))
355         self.connection.commit()
356
357     def did_check_directory_healthy(self, dircap, results):
358         now = time.time()
359         self.cursor.execute("UPDATE directories"
360                             " SET last_checked=?"
361                             " WHERE dircap=?",
362                             (now, dircap))
363         self.connection.commit()