]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/scripts/backupdb.py
8ee4cea77e76e3ae400fdc869ea47192e1faf496
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / scripts / backupdb.py
1
2 import os.path, sys, time, random, stat
3
4 from allmydata.util.netstring import netstring
5 from allmydata.util.hashutil import backupdb_dirhash
6 from allmydata.util import base32
7 from allmydata.util.fileutil import abspath_expanduser_unicode
8 from allmydata.util.encodingutil import to_str
9 from allmydata.util.dbutil import get_db, DBError
10
11
12 DAY = 24*60*60
13 MONTH = 30*DAY
14
15 SCHEMA_v1 = """
16 CREATE TABLE version -- added in v1
17 (
18  version INTEGER  -- contains one row, set to 2
19 );
20
21 CREATE TABLE local_files -- added in v1
22 (
23  path  VARCHAR(1024) PRIMARY KEY, -- index, this is an absolute UTF-8-encoded local filename
24  size  INTEGER,       -- os.stat(fn)[stat.ST_SIZE]
25  mtime NUMBER,        -- os.stat(fn)[stat.ST_MTIME]
26  ctime NUMBER,        -- os.stat(fn)[stat.ST_CTIME]
27  fileid INTEGER
28 );
29
30 CREATE TABLE caps -- added in v1
31 (
32  fileid INTEGER PRIMARY KEY AUTOINCREMENT,
33  filecap VARCHAR(256) UNIQUE       -- URI:CHK:...
34 );
35
36 CREATE TABLE last_upload -- added in v1
37 (
38  fileid INTEGER PRIMARY KEY,
39  last_uploaded TIMESTAMP,
40  last_checked TIMESTAMP
41 );
42
43 """
44
45 TABLE_DIRECTORY = """
46
47 CREATE TABLE directories -- added in v2
48 (
49  dirhash varchar(256) PRIMARY KEY,  -- base32(dirhash)
50  dircap varchar(256),               -- URI:DIR2-CHK:...
51  last_uploaded TIMESTAMP,
52  last_checked TIMESTAMP
53 );
54
55 """
56
57 SCHEMA_v2 = SCHEMA_v1 + TABLE_DIRECTORY
58
59 UPDATE_v1_to_v2 = TABLE_DIRECTORY + """
60 UPDATE version SET version=2;
61 """
62
63 UPDATERS = {
64     2: UPDATE_v1_to_v2,
65 }
66
67 def get_backupdb(dbfile, stderr=sys.stderr,
68                  create_version=(SCHEMA_v2, 2), just_create=False):
69     # open or create the given backupdb file. The parent directory must
70     # exist.
71     try:
72         (sqlite3, db) = get_db(dbfile, stderr, create_version, updaters=UPDATERS,
73                                just_create=just_create, dbname="backupdb")
74         return BackupDB_v2(sqlite3, db)
75     except DBError, e:
76         print >>stderr, e
77         return None
78
79
80 class FileResult:
81     def __init__(self, bdb, filecap, should_check,
82                  path, mtime, ctime, size):
83         self.bdb = bdb
84         self.filecap = filecap
85         self.should_check_p = should_check
86
87         self.path = path
88         self.mtime = mtime
89         self.ctime = ctime
90         self.size = size
91
92     def was_uploaded(self):
93         if self.filecap:
94             return self.filecap
95         return False
96
97     def did_upload(self, filecap):
98         self.bdb.did_upload_file(filecap, self.path,
99                                  self.mtime, self.ctime, self.size)
100
101     def should_check(self):
102         return self.should_check_p
103
104     def did_check_healthy(self, results):
105         self.bdb.did_check_file_healthy(self.filecap, results)
106
107 class DirectoryResult:
108     def __init__(self, bdb, dirhash, dircap, should_check):
109         self.bdb = bdb
110         self.dircap = dircap
111         self.should_check_p = should_check
112         self.dirhash = dirhash
113
114     def was_created(self):
115         if self.dircap:
116             return self.dircap
117         return False
118
119     def did_create(self, dircap):
120         self.bdb.did_create_directory(dircap, self.dirhash)
121
122     def should_check(self):
123         return self.should_check_p
124
125     def did_check_healthy(self, results):
126         self.bdb.did_check_directory_healthy(self.dircap, results)
127
128 class BackupDB_v2:
129     VERSION = 2
130     NO_CHECK_BEFORE = 1*MONTH
131     ALWAYS_CHECK_AFTER = 2*MONTH
132
133     def __init__(self, sqlite_module, connection):
134         self.sqlite_module = sqlite_module
135         self.connection = connection
136         self.cursor = connection.cursor()
137
138     def check_file(self, path, use_timestamps=True):
139         """I will tell you if a given local file needs to be uploaded or not,
140         by looking in a database and seeing if I have a record of this file
141         having been uploaded earlier.
142
143         I return a FileResults object, synchronously. If r.was_uploaded()
144         returns False, you should upload the file. When you are finished
145         uploading it, call r.did_upload(filecap), so I can update my
146         database.
147
148         If was_uploaded() returns a filecap, you might be able to avoid an
149         upload. Call r.should_check(), and if it says False, you can skip the
150         upload and use the filecap returned by was_uploaded().
151
152         If should_check() returns True, you should perform a filecheck on the
153         filecap returned by was_uploaded(). If the check indicates the file
154         is healthy, please call r.did_check_healthy(checker_results) so I can
155         update the database, using the de-JSONized response from the webapi
156         t=check call for 'checker_results'. If the check indicates the file
157         is not healthy, please upload the file and call r.did_upload(filecap)
158         when you're done.
159
160         I use_timestamps=True (the default), I will compare ctime and mtime
161         of the local file against an entry in my database, and consider the
162         file to be unchanged if ctime, mtime, and filesize are all the same
163         as the earlier version. If use_timestamps=False, I will not trust the
164         timestamps, so more files (perhaps all) will be marked as needing
165         upload. A future version of this database may hash the file to make
166         equality decisions, in which case use_timestamps=False will not
167         always imply r.must_upload()==True.
168
169         'path' points to a local file on disk, possibly relative to the
170         current working directory. The database stores absolute pathnames.
171         """
172
173         path = abspath_expanduser_unicode(path)
174         s = os.stat(path)
175         size = s[stat.ST_SIZE]
176         ctime = s[stat.ST_CTIME]
177         mtime = s[stat.ST_MTIME]
178
179         now = time.time()
180         c = self.cursor
181
182         c.execute("SELECT size,mtime,ctime,fileid"
183                   " FROM local_files"
184                   " WHERE path=?",
185                   (path,))
186         row = self.cursor.fetchone()
187         if not row:
188             return FileResult(self, None, False, path, mtime, ctime, size)
189         (last_size,last_mtime,last_ctime,last_fileid) = row
190
191         c.execute("SELECT caps.filecap, last_upload.last_checked"
192                   " FROM caps,last_upload"
193                   " WHERE caps.fileid=? AND last_upload.fileid=?",
194                   (last_fileid, last_fileid))
195         row2 = c.fetchone()
196
197         if ((last_size != size
198              or not use_timestamps
199              or last_mtime != mtime
200              or last_ctime != ctime) # the file has been changed
201             or (not row2) # we somehow forgot where we put the file last time
202             ):
203             c.execute("DELETE FROM local_files WHERE path=?", (path,))
204             self.connection.commit()
205             return FileResult(self, None, False, path, mtime, ctime, size)
206
207         # at this point, we're allowed to assume the file hasn't been changed
208         (filecap, last_checked) = row2
209         age = now - last_checked
210
211         probability = ((age - self.NO_CHECK_BEFORE) /
212                        (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
213         probability = min(max(probability, 0.0), 1.0)
214         should_check = bool(random.random() < probability)
215
216         return FileResult(self, to_str(filecap), should_check,
217                           path, mtime, ctime, size)
218
219     def get_or_allocate_fileid_for_cap(self, filecap):
220         # find an existing fileid for this filecap, or insert a new one. The
221         # caller is required to commit() afterwards.
222
223         # mysql has "INSERT ... ON DUPLICATE KEY UPDATE", but not sqlite
224         # sqlite has "INSERT ON CONFLICT REPLACE", but not mysql
225         # So we use INSERT, ignore any error, then a SELECT
226         c = self.cursor
227         try:
228             c.execute("INSERT INTO caps (filecap) VALUES (?)", (filecap,))
229         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
230             # sqlite3 on sid gives IntegrityError
231             # pysqlite2 (which we don't use, so maybe no longer relevant) on dapper gives OperationalError
232             pass
233         c.execute("SELECT fileid FROM caps WHERE filecap=?", (filecap,))
234         foundrow = c.fetchone()
235         assert foundrow
236         fileid = foundrow[0]
237         return fileid
238
239     def did_upload_file(self, filecap, path, mtime, ctime, size):
240         now = time.time()
241         fileid = self.get_or_allocate_fileid_for_cap(filecap)
242         try:
243             self.cursor.execute("INSERT INTO last_upload VALUES (?,?,?)",
244                                 (fileid, now, now))
245         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
246             self.cursor.execute("UPDATE last_upload"
247                                 " SET last_uploaded=?, last_checked=?"
248                                 " WHERE fileid=?",
249                                 (now, now, fileid))
250         try:
251             self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?)",
252                                 (path, size, mtime, ctime, fileid))
253         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
254             self.cursor.execute("UPDATE local_files"
255                                 " SET size=?, mtime=?, ctime=?, fileid=?"
256                                 " WHERE path=?",
257                                 (size, mtime, ctime, fileid, path))
258         self.connection.commit()
259
260     def did_check_file_healthy(self, filecap, results):
261         now = time.time()
262         fileid = self.get_or_allocate_fileid_for_cap(filecap)
263         self.cursor.execute("UPDATE last_upload"
264                             " SET last_checked=?"
265                             " WHERE fileid=?",
266                             (now, fileid))
267         self.connection.commit()
268
269     def check_directory(self, contents):
270         """I will tell you if a new directory needs to be created for a given
271         set of directory contents, or if I know of an existing (immutable)
272         directory that can be used instead.
273
274         'contents' should be a dictionary that maps from child name (a single
275         unicode string) to immutable childcap (filecap or dircap).
276
277         I return a DirectoryResult object, synchronously. If r.was_created()
278         returns False, you should create the directory (with
279         t=mkdir-immutable). When you are finished, call r.did_create(dircap)
280         so I can update my database.
281
282         If was_created() returns a dircap, you might be able to avoid the
283         mkdir. Call r.should_check(), and if it says False, you can skip the
284         mkdir and use the dircap returned by was_created().
285
286         If should_check() returns True, you should perform a check operation
287         on the dircap returned by was_created(). If the check indicates the
288         directory is healthy, please call
289         r.did_check_healthy(checker_results) so I can update the database,
290         using the de-JSONized response from the webapi t=check call for
291         'checker_results'. If the check indicates the directory is not
292         healthy, please repair or re-create the directory and call
293         r.did_create(dircap) when you're done.
294         """
295
296         now = time.time()
297         entries = []
298         for name in contents:
299             entries.append( [name.encode("utf-8"), contents[name]] )
300         entries.sort()
301         data = "".join([netstring(name_utf8)+netstring(cap)
302                         for (name_utf8,cap) in entries])
303         dirhash = backupdb_dirhash(data)
304         dirhash_s = base32.b2a(dirhash)
305         c = self.cursor
306         c.execute("SELECT dircap, last_checked"
307                   " FROM directories WHERE dirhash=?", (dirhash_s,))
308         row = c.fetchone()
309         if not row:
310             return DirectoryResult(self, dirhash_s, None, False)
311         (dircap, last_checked) = row
312         age = now - last_checked
313
314         probability = ((age - self.NO_CHECK_BEFORE) /
315                        (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
316         probability = min(max(probability, 0.0), 1.0)
317         should_check = bool(random.random() < probability)
318
319         return DirectoryResult(self, dirhash_s, to_str(dircap), should_check)
320
321     def did_create_directory(self, dircap, dirhash):
322         now = time.time()
323         # if the dirhash is already present (i.e. we've re-uploaded an
324         # existing directory, possibly replacing the dircap with a new one),
325         # update the record in place. Otherwise create a new record.)
326         self.cursor.execute("REPLACE INTO directories VALUES (?,?,?,?)",
327                             (dirhash, dircap, now, now))
328         self.connection.commit()
329
330     def did_check_directory_healthy(self, dircap, results):
331         now = time.time()
332         self.cursor.execute("UPDATE directories"
333                             " SET last_checked=?"
334                             " WHERE dircap=?",
335                             (now, dircap))
336         self.connection.commit()