]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/scripts/backupdb.py
backupdb.py: cosmetics
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / scripts / backupdb.py
1
2 import os.path, sys, time, random, stat
3
4 from allmydata.util.netstring import netstring
5 from allmydata.util.hashutil import backupdb_dirhash
6 from allmydata.util import base32
7 from allmydata.util.fileutil import abspath_expanduser_unicode
8 from allmydata.util.encodingutil import to_str
9 from allmydata.util.dbutil import get_db, DBError
10
11
12 DAY = 24*60*60
13 MONTH = 30*DAY
14
15 SCHEMA_v1 = """
16 CREATE TABLE version -- added in v1
17 (
18  version INTEGER  -- contains one row, set to 2
19 );
20
21 CREATE TABLE local_files -- added in v1
22 (
23  path  VARCHAR(1024) PRIMARY KEY, -- index, this is an absolute UTF-8-encoded local filename
24  size  INTEGER,       -- os.stat(fn)[stat.ST_SIZE]
25  mtime NUMBER,        -- os.stat(fn)[stat.ST_MTIME]
26  ctime NUMBER,        -- os.stat(fn)[stat.ST_CTIME]
27  fileid INTEGER
28 );
29
30 CREATE TABLE caps -- added in v1
31 (
32  fileid INTEGER PRIMARY KEY AUTOINCREMENT,
33  filecap VARCHAR(256) UNIQUE       -- URI:CHK:...
34 );
35
36 CREATE TABLE last_upload -- added in v1
37 (
38  fileid INTEGER PRIMARY KEY,
39  last_uploaded TIMESTAMP,
40  last_checked TIMESTAMP
41 );
42
43 """
44
45 TABLE_DIRECTORY = """
46
47 CREATE TABLE directories -- added in v2
48 (
49  dirhash varchar(256) PRIMARY KEY,  -- base32(dirhash)
50  dircap varchar(256),               -- URI:DIR2-CHK:...
51  last_uploaded TIMESTAMP,
52  last_checked TIMESTAMP
53 );
54
55 """
56
57 SCHEMA_v2 = SCHEMA_v1 + TABLE_DIRECTORY
58
59 UPDATE_v1_to_v2 = TABLE_DIRECTORY + """
60 UPDATE version SET version=2;
61 """
62
63 UPDATERS = {
64     2: UPDATE_v1_to_v2,
65 }
66
67 def get_backupdb(dbfile, stderr=sys.stderr,
68                  create_version=(SCHEMA_v2, 2), just_create=False):
69     # Open or create the given backupdb file. The parent directory must
70     # exist.
71     try:
72         (sqlite3, db) = get_db(dbfile, stderr, create_version, updaters=UPDATERS,
73                                just_create=just_create, dbname="backupdb")
74         return BackupDB_v2(sqlite3, db)
75     except DBError, e:
76         print >>stderr, e
77         return None
78
79
80 class FileResult:
81     def __init__(self, bdb, filecap, should_check,
82                  path, mtime, ctime, size):
83         self.bdb = bdb
84         self.filecap = filecap
85         self.should_check_p = should_check
86
87         self.path = path
88         self.mtime = mtime
89         self.ctime = ctime
90         self.size = size
91
92     def was_uploaded(self):
93         if self.filecap:
94             return self.filecap
95         return False
96
97     def did_upload(self, filecap):
98         self.bdb.did_upload_file(filecap, self.path,
99                                  self.mtime, self.ctime, self.size)
100
101     def should_check(self):
102         return self.should_check_p
103
104     def did_check_healthy(self, results):
105         self.bdb.did_check_file_healthy(self.filecap, results)
106
107
108 class DirectoryResult:
109     def __init__(self, bdb, dirhash, dircap, should_check):
110         self.bdb = bdb
111         self.dircap = dircap
112         self.should_check_p = should_check
113         self.dirhash = dirhash
114
115     def was_created(self):
116         if self.dircap:
117             return self.dircap
118         return False
119
120     def did_create(self, dircap):
121         self.bdb.did_create_directory(dircap, self.dirhash)
122
123     def should_check(self):
124         return self.should_check_p
125
126     def did_check_healthy(self, results):
127         self.bdb.did_check_directory_healthy(self.dircap, results)
128
129
130 class BackupDB_v2:
131     VERSION = 2
132     NO_CHECK_BEFORE = 1*MONTH
133     ALWAYS_CHECK_AFTER = 2*MONTH
134
135     def __init__(self, sqlite_module, connection):
136         self.sqlite_module = sqlite_module
137         self.connection = connection
138         self.cursor = connection.cursor()
139
140     def check_file(self, path, use_timestamps=True):
141         """I will tell you if a given local file needs to be uploaded or not,
142         by looking in a database and seeing if I have a record of this file
143         having been uploaded earlier.
144
145         I return a FileResults object, synchronously. If r.was_uploaded()
146         returns False, you should upload the file. When you are finished
147         uploading it, call r.did_upload(filecap), so I can update my
148         database.
149
150         If was_uploaded() returns a filecap, you might be able to avoid an
151         upload. Call r.should_check(), and if it says False, you can skip the
152         upload and use the filecap returned by was_uploaded().
153
154         If should_check() returns True, you should perform a filecheck on the
155         filecap returned by was_uploaded(). If the check indicates the file
156         is healthy, please call r.did_check_healthy(checker_results) so I can
157         update the database, using the de-JSONized response from the webapi
158         t=check call for 'checker_results'. If the check indicates the file
159         is not healthy, please upload the file and call r.did_upload(filecap)
160         when you're done.
161
162         If use_timestamps=True (the default), I will compare ctime and mtime
163         of the local file against an entry in my database, and consider the
164         file to be unchanged if ctime, mtime, and filesize are all the same
165         as the earlier version. If use_timestamps=False, I will not trust the
166         timestamps, so more files (perhaps all) will be marked as needing
167         upload. A future version of this database may hash the file to make
168         equality decisions, in which case use_timestamps=False will not
169         always imply r.must_upload()==True.
170
171         'path' points to a local file on disk, possibly relative to the
172         current working directory. The database stores absolute pathnames.
173         """
174
175         path = abspath_expanduser_unicode(path)
176         s = os.stat(path)
177         size = s[stat.ST_SIZE]
178         ctime = s[stat.ST_CTIME]
179         mtime = s[stat.ST_MTIME]
180
181         now = time.time()
182         c = self.cursor
183
184         c.execute("SELECT size,mtime,ctime,fileid"
185                   " FROM local_files"
186                   " WHERE path=?",
187                   (path,))
188         row = self.cursor.fetchone()
189         if not row:
190             return FileResult(self, None, False, path, mtime, ctime, size)
191         (last_size,last_mtime,last_ctime,last_fileid) = row
192
193         c.execute("SELECT caps.filecap, last_upload.last_checked"
194                   " FROM caps,last_upload"
195                   " WHERE caps.fileid=? AND last_upload.fileid=?",
196                   (last_fileid, last_fileid))
197         row2 = c.fetchone()
198
199         if ((last_size != size
200              or not use_timestamps
201              or last_mtime != mtime
202              or last_ctime != ctime) # the file has been changed
203             or (not row2) # we somehow forgot where we put the file last time
204             ):
205             c.execute("DELETE FROM local_files WHERE path=?", (path,))
206             self.connection.commit()
207             return FileResult(self, None, False, path, mtime, ctime, size)
208
209         # at this point, we're allowed to assume the file hasn't been changed
210         (filecap, last_checked) = row2
211         age = now - last_checked
212
213         probability = ((age - self.NO_CHECK_BEFORE) /
214                        (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
215         probability = min(max(probability, 0.0), 1.0)
216         should_check = bool(random.random() < probability)
217
218         return FileResult(self, to_str(filecap), should_check,
219                           path, mtime, ctime, size)
220
221     def get_or_allocate_fileid_for_cap(self, filecap):
222         # find an existing fileid for this filecap, or insert a new one. The
223         # caller is required to commit() afterwards.
224
225         # mysql has "INSERT ... ON DUPLICATE KEY UPDATE", but not sqlite
226         # sqlite has "INSERT ON CONFLICT REPLACE", but not mysql
227         # So we use INSERT, ignore any error, then a SELECT
228         c = self.cursor
229         try:
230             c.execute("INSERT INTO caps (filecap) VALUES (?)", (filecap,))
231         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
232             # sqlite3 on sid gives IntegrityError
233             # pysqlite2 (which we don't use, so maybe no longer relevant) on dapper gives OperationalError
234             pass
235         c.execute("SELECT fileid FROM caps WHERE filecap=?", (filecap,))
236         foundrow = c.fetchone()
237         assert foundrow
238         fileid = foundrow[0]
239         return fileid
240
241     def did_upload_file(self, filecap, path, mtime, ctime, size):
242         now = time.time()
243         fileid = self.get_or_allocate_fileid_for_cap(filecap)
244         try:
245             self.cursor.execute("INSERT INTO last_upload VALUES (?,?,?)",
246                                 (fileid, now, now))
247         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
248             self.cursor.execute("UPDATE last_upload"
249                                 " SET last_uploaded=?, last_checked=?"
250                                 " WHERE fileid=?",
251                                 (now, now, fileid))
252         try:
253             self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?)",
254                                 (path, size, mtime, ctime, fileid))
255         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
256             self.cursor.execute("UPDATE local_files"
257                                 " SET size=?, mtime=?, ctime=?, fileid=?"
258                                 " WHERE path=?",
259                                 (size, mtime, ctime, fileid, path))
260         self.connection.commit()
261
262     def did_check_file_healthy(self, filecap, results):
263         now = time.time()
264         fileid = self.get_or_allocate_fileid_for_cap(filecap)
265         self.cursor.execute("UPDATE last_upload"
266                             " SET last_checked=?"
267                             " WHERE fileid=?",
268                             (now, fileid))
269         self.connection.commit()
270
271     def check_directory(self, contents):
272         """I will tell you if a new directory needs to be created for a given
273         set of directory contents, or if I know of an existing (immutable)
274         directory that can be used instead.
275
276         'contents' should be a dictionary that maps from child name (a single
277         unicode string) to immutable childcap (filecap or dircap).
278
279         I return a DirectoryResult object, synchronously. If r.was_created()
280         returns False, you should create the directory (with
281         t=mkdir-immutable). When you are finished, call r.did_create(dircap)
282         so I can update my database.
283
284         If was_created() returns a dircap, you might be able to avoid the
285         mkdir. Call r.should_check(), and if it says False, you can skip the
286         mkdir and use the dircap returned by was_created().
287
288         If should_check() returns True, you should perform a check operation
289         on the dircap returned by was_created(). If the check indicates the
290         directory is healthy, please call
291         r.did_check_healthy(checker_results) so I can update the database,
292         using the de-JSONized response from the webapi t=check call for
293         'checker_results'. If the check indicates the directory is not
294         healthy, please repair or re-create the directory and call
295         r.did_create(dircap) when you're done.
296         """
297
298         now = time.time()
299         entries = []
300         for name in contents:
301             entries.append( [name.encode("utf-8"), contents[name]] )
302         entries.sort()
303         data = "".join([netstring(name_utf8)+netstring(cap)
304                         for (name_utf8,cap) in entries])
305         dirhash = backupdb_dirhash(data)
306         dirhash_s = base32.b2a(dirhash)
307         c = self.cursor
308         c.execute("SELECT dircap, last_checked"
309                   " FROM directories WHERE dirhash=?", (dirhash_s,))
310         row = c.fetchone()
311         if not row:
312             return DirectoryResult(self, dirhash_s, None, False)
313         (dircap, last_checked) = row
314         age = now - last_checked
315
316         probability = ((age - self.NO_CHECK_BEFORE) /
317                        (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
318         probability = min(max(probability, 0.0), 1.0)
319         should_check = bool(random.random() < probability)
320
321         return DirectoryResult(self, dirhash_s, to_str(dircap), should_check)
322
323     def did_create_directory(self, dircap, dirhash):
324         now = time.time()
325         # if the dirhash is already present (i.e. we've re-uploaded an
326         # existing directory, possibly replacing the dircap with a new one),
327         # update the record in place. Otherwise create a new record.)
328         self.cursor.execute("REPLACE INTO directories VALUES (?,?,?,?)",
329                             (dirhash, dircap, now, now))
330         self.connection.commit()
331
332     def did_check_directory_healthy(self, dircap, results):
333         now = time.time()
334         self.cursor.execute("UPDATE directories"
335                             " SET last_checked=?"
336                             " WHERE dircap=?",
337                             (now, dircap))
338         self.connection.commit()