]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/scripts/backupdb.py
75ee0d9ce0c670c4cc5898b09ea98858a20b629c
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / scripts / backupdb.py
1
2 import os.path, sys, time, random, stat
3
4 from allmydata.util.netstring import netstring
5 from allmydata.util.hashutil import backupdb_dirhash
6 from allmydata.util import base32
7 from allmydata.util.fileutil import abspath_expanduser_unicode
8 from allmydata.util.encodingutil import to_str
9
10 DAY = 24*60*60
11 MONTH = 30*DAY
12
13 SCHEMA_v1 = """
14 CREATE TABLE version -- added in v1
15 (
16  version INTEGER  -- contains one row, set to 2
17 );
18
19 CREATE TABLE local_files -- added in v1
20 (
21  path  VARCHAR(1024) PRIMARY KEY, -- index, this is an absolute UTF-8-encoded local filename
22  size  INTEGER,       -- os.stat(fn)[stat.ST_SIZE]
23  mtime NUMBER,        -- os.stat(fn)[stat.ST_MTIME]
24  ctime NUMBER,        -- os.stat(fn)[stat.ST_CTIME]
25  fileid INTEGER
26 );
27
28 CREATE TABLE caps -- added in v1
29 (
30  fileid INTEGER PRIMARY KEY AUTOINCREMENT,
31  filecap VARCHAR(256) UNIQUE       -- URI:CHK:...
32 );
33
34 CREATE TABLE last_upload -- added in v1
35 (
36  fileid INTEGER PRIMARY KEY,
37  last_uploaded TIMESTAMP,
38  last_checked TIMESTAMP
39 );
40
41 """
42
43 TABLE_DIRECTORY = """
44
45 CREATE TABLE directories -- added in v2
46 (
47  dirhash varchar(256) PRIMARY KEY,  -- base32(dirhash)
48  dircap varchar(256),               -- URI:DIR2-CHK:...
49  last_uploaded TIMESTAMP,
50  last_checked TIMESTAMP
51 );
52
53 """
54
55 SCHEMA_v2 = SCHEMA_v1 + TABLE_DIRECTORY
56
57 UPDATE_v1_to_v2 = TABLE_DIRECTORY + """
58 UPDATE version SET version=2;
59 """
60
61
62 def get_backupdb(dbfile, stderr=sys.stderr,
63                  create_version=(SCHEMA_v2, 2), just_create=False):
64     # open or create the given backupdb file. The parent directory must
65     # exist.
66     import sqlite3
67
68     must_create = not os.path.exists(dbfile)
69     try:
70         db = sqlite3.connect(dbfile)
71     except (EnvironmentError, sqlite3.OperationalError), e:
72         print >>stderr, "Unable to create/open backupdb file %s: %s" % (dbfile, e)
73         return None
74
75     c = db.cursor()
76     if must_create:
77         schema, version = create_version
78         c.executescript(schema)
79         c.execute("INSERT INTO version (version) VALUES (?)", (version,))
80         db.commit()
81
82     try:
83         c.execute("SELECT version FROM version")
84         version = c.fetchone()[0]
85     except sqlite3.DatabaseError, e:
86         # this indicates that the file is not a compatible database format.
87         # Perhaps it was created with an old version, or it might be junk.
88         print >>stderr, "backupdb file is unusable: %s" % e
89         return None
90
91     if just_create: # for tests
92         return True
93
94     if version == 1:
95         c.executescript(UPDATE_v1_to_v2)
96         db.commit()
97         version = 2
98     if version == 2:
99         return BackupDB_v2(sqlite3, db)
100     print >>stderr, "Unable to handle backupdb version %s" % version
101     return None
102
103 class FileResult:
104     def __init__(self, bdb, filecap, should_check,
105                  path, mtime, ctime, size):
106         self.bdb = bdb
107         self.filecap = filecap
108         self.should_check_p = should_check
109
110         self.path = path
111         self.mtime = mtime
112         self.ctime = ctime
113         self.size = size
114
115     def was_uploaded(self):
116         if self.filecap:
117             return self.filecap
118         return False
119
120     def did_upload(self, filecap):
121         self.bdb.did_upload_file(filecap, self.path,
122                                  self.mtime, self.ctime, self.size)
123
124     def should_check(self):
125         return self.should_check_p
126
127     def did_check_healthy(self, results):
128         self.bdb.did_check_file_healthy(self.filecap, results)
129
130 class DirectoryResult:
131     def __init__(self, bdb, dirhash, dircap, should_check):
132         self.bdb = bdb
133         self.dircap = dircap
134         self.should_check_p = should_check
135         self.dirhash = dirhash
136
137     def was_created(self):
138         if self.dircap:
139             return self.dircap
140         return False
141
142     def did_create(self, dircap):
143         self.bdb.did_create_directory(dircap, self.dirhash)
144
145     def should_check(self):
146         return self.should_check_p
147
148     def did_check_healthy(self, results):
149         self.bdb.did_check_directory_healthy(self.dircap, results)
150
151 class BackupDB_v2:
152     VERSION = 2
153     NO_CHECK_BEFORE = 1*MONTH
154     ALWAYS_CHECK_AFTER = 2*MONTH
155
156     def __init__(self, sqlite_module, connection):
157         self.sqlite_module = sqlite_module
158         self.connection = connection
159         self.cursor = connection.cursor()
160
161     def check_file(self, path, use_timestamps=True):
162         """I will tell you if a given local file needs to be uploaded or not,
163         by looking in a database and seeing if I have a record of this file
164         having been uploaded earlier.
165
166         I return a FileResults object, synchronously. If r.was_uploaded()
167         returns False, you should upload the file. When you are finished
168         uploading it, call r.did_upload(filecap), so I can update my
169         database.
170
171         If was_uploaded() returns a filecap, you might be able to avoid an
172         upload. Call r.should_check(), and if it says False, you can skip the
173         upload and use the filecap returned by was_uploaded().
174
175         If should_check() returns True, you should perform a filecheck on the
176         filecap returned by was_uploaded(). If the check indicates the file
177         is healthy, please call r.did_check_healthy(checker_results) so I can
178         update the database, using the de-JSONized response from the webapi
179         t=check call for 'checker_results'. If the check indicates the file
180         is not healthy, please upload the file and call r.did_upload(filecap)
181         when you're done.
182
183         I use_timestamps=True (the default), I will compare ctime and mtime
184         of the local file against an entry in my database, and consider the
185         file to be unchanged if ctime, mtime, and filesize are all the same
186         as the earlier version. If use_timestamps=False, I will not trust the
187         timestamps, so more files (perhaps all) will be marked as needing
188         upload. A future version of this database may hash the file to make
189         equality decisions, in which case use_timestamps=False will not
190         always imply r.must_upload()==True.
191
192         'path' points to a local file on disk, possibly relative to the
193         current working directory. The database stores absolute pathnames.
194         """
195
196         path = abspath_expanduser_unicode(path)
197         s = os.stat(path)
198         size = s[stat.ST_SIZE]
199         ctime = s[stat.ST_CTIME]
200         mtime = s[stat.ST_MTIME]
201
202         now = time.time()
203         c = self.cursor
204
205         c.execute("SELECT size,mtime,ctime,fileid"
206                   " FROM local_files"
207                   " WHERE path=?",
208                   (path,))
209         row = self.cursor.fetchone()
210         if not row:
211             return FileResult(self, None, False, path, mtime, ctime, size)
212         (last_size,last_mtime,last_ctime,last_fileid) = row
213
214         c.execute("SELECT caps.filecap, last_upload.last_checked"
215                   " FROM caps,last_upload"
216                   " WHERE caps.fileid=? AND last_upload.fileid=?",
217                   (last_fileid, last_fileid))
218         row2 = c.fetchone()
219
220         if ((last_size != size
221              or not use_timestamps
222              or last_mtime != mtime
223              or last_ctime != ctime) # the file has been changed
224             or (not row2) # we somehow forgot where we put the file last time
225             ):
226             c.execute("DELETE FROM local_files WHERE path=?", (path,))
227             self.connection.commit()
228             return FileResult(self, None, False, path, mtime, ctime, size)
229
230         # at this point, we're allowed to assume the file hasn't been changed
231         (filecap, last_checked) = row2
232         age = now - last_checked
233
234         probability = ((age - self.NO_CHECK_BEFORE) /
235                        (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
236         probability = min(max(probability, 0.0), 1.0)
237         should_check = bool(random.random() < probability)
238
239         return FileResult(self, to_str(filecap), should_check,
240                           path, mtime, ctime, size)
241
242     def get_or_allocate_fileid_for_cap(self, filecap):
243         # find an existing fileid for this filecap, or insert a new one. The
244         # caller is required to commit() afterwards.
245
246         # mysql has "INSERT ... ON DUPLICATE KEY UPDATE", but not sqlite
247         # sqlite has "INSERT ON CONFLICT REPLACE", but not mysql
248         # So we use INSERT, ignore any error, then a SELECT
249         c = self.cursor
250         try:
251             c.execute("INSERT INTO caps (filecap) VALUES (?)", (filecap,))
252         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
253             # sqlite3 on sid gives IntegrityError
254             # pysqlite2 (which we don't use, so maybe no longer relevant) on dapper gives OperationalError
255             pass
256         c.execute("SELECT fileid FROM caps WHERE filecap=?", (filecap,))
257         foundrow = c.fetchone()
258         assert foundrow
259         fileid = foundrow[0]
260         return fileid
261
262     def did_upload_file(self, filecap, path, mtime, ctime, size):
263         now = time.time()
264         fileid = self.get_or_allocate_fileid_for_cap(filecap)
265         try:
266             self.cursor.execute("INSERT INTO last_upload VALUES (?,?,?)",
267                                 (fileid, now, now))
268         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
269             self.cursor.execute("UPDATE last_upload"
270                                 " SET last_uploaded=?, last_checked=?"
271                                 " WHERE fileid=?",
272                                 (now, now, fileid))
273         try:
274             self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?)",
275                                 (path, size, mtime, ctime, fileid))
276         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
277             self.cursor.execute("UPDATE local_files"
278                                 " SET size=?, mtime=?, ctime=?, fileid=?"
279                                 " WHERE path=?",
280                                 (size, mtime, ctime, fileid, path))
281         self.connection.commit()
282
283     def did_check_file_healthy(self, filecap, results):
284         now = time.time()
285         fileid = self.get_or_allocate_fileid_for_cap(filecap)
286         self.cursor.execute("UPDATE last_upload"
287                             " SET last_checked=?"
288                             " WHERE fileid=?",
289                             (now, fileid))
290         self.connection.commit()
291
292     def check_directory(self, contents):
293         """I will tell you if a new directory needs to be created for a given
294         set of directory contents, or if I know of an existing (immutable)
295         directory that can be used instead.
296
297         'contents' should be a dictionary that maps from child name (a single
298         unicode string) to immutable childcap (filecap or dircap).
299
300         I return a DirectoryResult object, synchronously. If r.was_created()
301         returns False, you should create the directory (with
302         t=mkdir-immutable). When you are finished, call r.did_create(dircap)
303         so I can update my database.
304
305         If was_created() returns a dircap, you might be able to avoid the
306         mkdir. Call r.should_check(), and if it says False, you can skip the
307         mkdir and use the dircap returned by was_created().
308
309         If should_check() returns True, you should perform a check operation
310         on the dircap returned by was_created(). If the check indicates the
311         directory is healthy, please call
312         r.did_check_healthy(checker_results) so I can update the database,
313         using the de-JSONized response from the webapi t=check call for
314         'checker_results'. If the check indicates the directory is not
315         healthy, please repair or re-create the directory and call
316         r.did_create(dircap) when you're done.
317         """
318
319         now = time.time()
320         entries = []
321         for name in contents:
322             entries.append( [name.encode("utf-8"), contents[name]] )
323         entries.sort()
324         data = "".join([netstring(name_utf8)+netstring(cap)
325                         for (name_utf8,cap) in entries])
326         dirhash = backupdb_dirhash(data)
327         dirhash_s = base32.b2a(dirhash)
328         c = self.cursor
329         c.execute("SELECT dircap, last_checked"
330                   " FROM directories WHERE dirhash=?", (dirhash_s,))
331         row = c.fetchone()
332         if not row:
333             return DirectoryResult(self, dirhash_s, None, False)
334         (dircap, last_checked) = row
335         age = now - last_checked
336
337         probability = ((age - self.NO_CHECK_BEFORE) /
338                        (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
339         probability = min(max(probability, 0.0), 1.0)
340         should_check = bool(random.random() < probability)
341
342         return DirectoryResult(self, dirhash_s, to_str(dircap), should_check)
343
344     def did_create_directory(self, dircap, dirhash):
345         now = time.time()
346         # if the dirhash is already present (i.e. we've re-uploaded an
347         # existing directory, possibly replacing the dircap with a new one),
348         # update the record in place. Otherwise create a new record.)
349         self.cursor.execute("REPLACE INTO directories VALUES (?,?,?,?)",
350                             (dirhash, dircap, now, now))
351         self.connection.commit()
352
353     def did_check_directory_healthy(self, dircap, results):
354         now = time.time()
355         self.cursor.execute("UPDATE directories"
356                             " SET last_checked=?"
357                             " WHERE dircap=?",
358                             (now, dircap))
359         self.connection.commit()