]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/scripts/backupdb.py
backupdb.did_create_directory: use REPLACE INTO, not INSERT INTO + ignore error
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / scripts / backupdb.py
1
2 # the backupdb is only available if sqlite3 is available. Python-2.5.x and
3 # beyond include sqlite3 in the standard library. For python-2.4, the
4 # "pysqlite2" "package" (or "module") (which, despite the confusing name, uses
5 # sqlite3, and which, confusingly, comes in the "pysqlite" "distribution" (or
6 # "package")) must be installed. On debian, install python-pysqlite2
7
8 import os.path, sys, time, random, stat
9 from allmydata.util.netstring import netstring
10 from allmydata.util.hashutil import backupdb_dirhash
11 from allmydata.util import base32
12
13 DAY = 24*60*60
14 MONTH = 30*DAY
15
16 SCHEMA_v1 = """
17 CREATE TABLE version -- added in v1
18 (
19  version INTEGER  -- contains one row, set to 2
20 );
21
22 CREATE TABLE local_files -- added in v1
23 (
24  path  VARCHAR(1024) PRIMARY KEY, -- index, this is os.path.abspath(fn)
25  size  INTEGER,       -- os.stat(fn)[stat.ST_SIZE]
26  mtime NUMBER,        -- os.stat(fn)[stat.ST_MTIME]
27  ctime NUMBER,        -- os.stat(fn)[stat.ST_CTIME]
28  fileid INTEGER
29 );
30
31 CREATE TABLE caps -- added in v1
32 (
33  fileid INTEGER PRIMARY KEY AUTOINCREMENT,
34  filecap VARCHAR(256) UNIQUE       -- URI:CHK:...
35 );
36
37 CREATE TABLE last_upload -- added in v1
38 (
39  fileid INTEGER PRIMARY KEY,
40  last_uploaded TIMESTAMP,
41  last_checked TIMESTAMP
42 );
43
44 """
45
46 TABLE_DIRECTORY = """
47
48 CREATE TABLE directories -- added in v2
49 (
50  dirhash varchar(256) PRIMARY KEY,  -- base32(dirhash)
51  dircap varchar(256),               -- URI:DIR2-CHK:...
52  last_uploaded TIMESTAMP,
53  last_checked TIMESTAMP
54 );
55
56 """
57
58 SCHEMA_v2 = SCHEMA_v1 + TABLE_DIRECTORY
59
60 UPDATE_v1_to_v2 = TABLE_DIRECTORY + """
61 UPDATE version SET version=2;
62 """
63
64
65 def get_backupdb(dbfile, stderr=sys.stderr,
66                  create_version=(SCHEMA_v2, 2), just_create=False):
67     # open or create the given backupdb file. The parent directory must
68     # exist.
69     try:
70         import sqlite3
71         sqlite = sqlite3 # pyflakes whines about 'import sqlite3 as sqlite' ..
72     except ImportError:
73         from pysqlite2 import dbapi2
74         sqlite = dbapi2 # .. when this clause does it too
75         # This import should never fail, because setuptools requires that the
76         # "pysqlite" distribution is present at start time (if on Python < 2.5).
77
78     must_create = not os.path.exists(dbfile)
79     try:
80         db = sqlite.connect(dbfile)
81     except (EnvironmentError, sqlite.OperationalError), e:
82         print >>stderr, "Unable to create/open backupdb file %s: %s" % (dbfile, e)
83         return None
84
85     c = db.cursor()
86     if must_create:
87         schema, version = create_version
88         c.executescript(schema)
89         c.execute("INSERT INTO version (version) VALUES (?)", (version,))
90         db.commit()
91
92     try:
93         c.execute("SELECT version FROM version")
94         version = c.fetchone()[0]
95     except sqlite.DatabaseError, e:
96         # this indicates that the file is not a compatible database format.
97         # Perhaps it was created with an old version, or it might be junk.
98         print >>stderr, "backupdb file is unusable: %s" % e
99         return None
100
101     if just_create: # for tests
102         return True
103
104     if version == 1:
105         c.executescript(UPDATE_v1_to_v2)
106         db.commit()
107         version = 2
108     if version == 2:
109         return BackupDB_v2(sqlite, db)
110     print >>stderr, "Unable to handle backupdb version %s" % version
111     return None
112
113 class FileResult:
114     def __init__(self, bdb, filecap, should_check,
115                  path, mtime, ctime, size):
116         self.bdb = bdb
117         self.filecap = filecap
118         self.should_check_p = should_check
119
120         self.path = path
121         self.mtime = mtime
122         self.ctime = ctime
123         self.size = size
124
125     def was_uploaded(self):
126         if self.filecap:
127             return self.filecap
128         return False
129
130     def did_upload(self, filecap):
131         self.bdb.did_upload_file(filecap, self.path,
132                                  self.mtime, self.ctime, self.size)
133
134     def should_check(self):
135         return self.should_check_p
136
137     def did_check_healthy(self, results):
138         self.bdb.did_check_file_healthy(self.filecap, results)
139
140 class DirectoryResult:
141     def __init__(self, bdb, dirhash, dircap, should_check):
142         self.bdb = bdb
143         self.dircap = dircap
144         self.should_check_p = should_check
145         self.dirhash = dirhash
146
147     def was_created(self):
148         if self.dircap:
149             return self.dircap
150         return False
151
152     def did_create(self, dircap):
153         self.bdb.did_create_directory(dircap, self.dirhash)
154
155     def should_check(self):
156         return self.should_check_p
157
158     def did_check_healthy(self, results):
159         self.bdb.did_check_directory_healthy(self.dircap, results)
160
161 class BackupDB_v2:
162     VERSION = 2
163     NO_CHECK_BEFORE = 1*MONTH
164     ALWAYS_CHECK_AFTER = 2*MONTH
165
166     def __init__(self, sqlite_module, connection):
167         self.sqlite_module = sqlite_module
168         self.connection = connection
169         self.cursor = connection.cursor()
170
171     def check_file(self, path, use_timestamps=True):
172         """I will tell you if a given local file needs to be uploaded or not,
173         by looking in a database and seeing if I have a record of this file
174         having been uploaded earlier.
175
176         I return a FileResults object, synchronously. If r.was_uploaded()
177         returns False, you should upload the file. When you are finished
178         uploading it, call r.did_upload(filecap), so I can update my
179         database.
180
181         If was_uploaded() returns a filecap, you might be able to avoid an
182         upload. Call r.should_check(), and if it says False, you can skip the
183         upload and use the filecap returned by was_uploaded().
184
185         If should_check() returns True, you should perform a filecheck on the
186         filecap returned by was_uploaded(). If the check indicates the file
187         is healthy, please call r.did_check_healthy(checker_results) so I can
188         update the database, using the de-JSONized response from the webapi
189         t=check call for 'checker_results'. If the check indicates the file
190         is not healthy, please upload the file and call r.did_upload(filecap)
191         when you're done.
192
193         I use_timestamps=True (the default), I will compare ctime and mtime
194         of the local file against an entry in my database, and consider the
195         file to be unchanged if ctime, mtime, and filesize are all the same
196         as the earlier version. If use_timestamps=False, I will not trust the
197         timestamps, so more files (perhaps all) will be marked as needing
198         upload. A future version of this database may hash the file to make
199         equality decisions, in which case use_timestamps=False will not
200         always imply r.must_upload()==True.
201
202         'path' points to a local file on disk, possibly relative to the
203         current working directory. The database stores absolute pathnames.
204         """
205
206         path = os.path.abspath(path)
207         s = os.stat(path)
208         size = s[stat.ST_SIZE]
209         ctime = s[stat.ST_CTIME]
210         mtime = s[stat.ST_MTIME]
211
212         now = time.time()
213         c = self.cursor
214
215         c.execute("SELECT size,mtime,ctime,fileid"
216                   " FROM local_files"
217                   " WHERE path=?",
218                   (path,))
219         row = self.cursor.fetchone()
220         if not row:
221             return FileResult(self, None, False, path, mtime, ctime, size)
222         (last_size,last_mtime,last_ctime,last_fileid) = row
223
224         c.execute("SELECT caps.filecap, last_upload.last_checked"
225                   " FROM caps,last_upload"
226                   " WHERE caps.fileid=? AND last_upload.fileid=?",
227                   (last_fileid, last_fileid))
228         row2 = c.fetchone()
229
230         if ((last_size != size
231              or not use_timestamps
232              or last_mtime != mtime
233              or last_ctime != ctime) # the file has been changed
234             or (not row2) # we somehow forgot where we put the file last time
235             ):
236             c.execute("DELETE FROM local_files WHERE path=?", (path,))
237             self.connection.commit()
238             return FileResult(self, None, False, path, mtime, ctime, size)
239
240         # at this point, we're allowed to assume the file hasn't been changed
241         (filecap, last_checked) = row2
242         age = now - last_checked
243
244         probability = ((age - self.NO_CHECK_BEFORE) /
245                        (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
246         probability = min(max(probability, 0.0), 1.0)
247         should_check = bool(random.random() < probability)
248
249         return FileResult(self, str(filecap), should_check,
250                           path, mtime, ctime, size)
251
252     def get_or_allocate_fileid_for_cap(self, filecap):
253         # find an existing fileid for this filecap, or insert a new one. The
254         # caller is required to commit() afterwards.
255
256         # mysql has "INSERT ... ON DUPLICATE KEY UPDATE", but not sqlite
257         # sqlite has "INSERT ON CONFLICT REPLACE", but not mysql
258         # So we use INSERT, ignore any error, then a SELECT
259         c = self.cursor
260         try:
261             c.execute("INSERT INTO caps (filecap) VALUES (?)", (filecap,))
262         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
263             # sqlite3 on sid gives IntegrityError
264             # pysqlite2 on dapper gives OperationalError
265             pass
266         c.execute("SELECT fileid FROM caps WHERE filecap=?", (filecap,))
267         foundrow = c.fetchone()
268         assert foundrow
269         fileid = foundrow[0]
270         return fileid
271
272     def did_upload_file(self, filecap, path, mtime, ctime, size):
273         now = time.time()
274         fileid = self.get_or_allocate_fileid_for_cap(filecap)
275         try:
276             self.cursor.execute("INSERT INTO last_upload VALUES (?,?,?)",
277                                 (fileid, now, now))
278         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
279             self.cursor.execute("UPDATE last_upload"
280                                 " SET last_uploaded=?, last_checked=?"
281                                 " WHERE fileid=?",
282                                 (now, now, fileid))
283         try:
284             self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?)",
285                                 (path, size, mtime, ctime, fileid))
286         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
287             self.cursor.execute("UPDATE local_files"
288                                 " SET size=?, mtime=?, ctime=?, fileid=?"
289                                 " WHERE path=?",
290                                 (size, mtime, ctime, fileid, path))
291         self.connection.commit()
292
293     def did_check_file_healthy(self, filecap, results):
294         now = time.time()
295         fileid = self.get_or_allocate_fileid_for_cap(filecap)
296         self.cursor.execute("UPDATE last_upload"
297                             " SET last_checked=?"
298                             " WHERE fileid=?",
299                             (now, fileid))
300         self.connection.commit()
301
302     def check_directory(self, contents):
303         """I will tell you if a new directory needs to be created for a given
304         set of directory contents, or if I know of an existing (immutable)
305         directory that can be used instead.
306
307         'contents' should be a dictionary that maps from child name (a single
308         unicode string) to immutable childcap (filecap or dircap).
309
310         I return a DirectoryResult object, synchronously. If r.was_created()
311         returns False, you should create the directory (with
312         t=mkdir-immutable). When you are finished, call r.did_create(dircap)
313         so I can update my database.
314
315         If was_created() returns a dircap, you might be able to avoid the
316         mkdir. Call r.should_check(), and if it says False, you can skip the
317         mkdir and use the dircap returned by was_created().
318
319         If should_check() returns True, you should perform a check operation
320         on the dircap returned by was_created(). If the check indicates the
321         directory is healthy, please call
322         r.did_check_healthy(checker_results) so I can update the database,
323         using the de-JSONized response from the webapi t=check call for
324         'checker_results'. If the check indicates the directory is not
325         healthy, please repair or re-create the directory and call
326         r.did_create(dircap) when you're done.
327         """
328
329         now = time.time()
330         entries = []
331         for name in contents:
332             entries.append( [name.encode("utf-8"), contents[name]] )
333         entries.sort()
334         data = "".join([netstring(name_utf8)+netstring(cap)
335                         for (name_utf8,cap) in entries])
336         dirhash = backupdb_dirhash(data)
337         dirhash_s = base32.b2a(dirhash)
338         c = self.cursor
339         c.execute("SELECT dircap, last_checked"
340                   " FROM directories WHERE dirhash=?", (dirhash_s,))
341         row = c.fetchone()
342         if not row:
343             return DirectoryResult(self, dirhash_s, None, False)
344         (dircap, last_checked) = row
345         age = now - last_checked
346
347         probability = ((age - self.NO_CHECK_BEFORE) /
348                        (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
349         probability = min(max(probability, 0.0), 1.0)
350         should_check = bool(random.random() < probability)
351
352         return DirectoryResult(self, dirhash_s, str(dircap), should_check)
353
354     def did_create_directory(self, dircap, dirhash):
355         now = time.time()
356         # if the dirhash is already present (i.e. we've re-uploaded an
357         # existing directory, possibly replacing the dircap with a new one),
358         # update the record in place. Otherwise create a new record.)
359         self.cursor.execute("REPLACE INTO directories VALUES (?,?,?,?)",
360                             (dirhash, dircap, now, now))
361         self.connection.commit()
362
363     def did_check_directory_healthy(self, dircap, results):
364         now = time.time()
365         self.cursor.execute("UPDATE directories"
366                             " SET last_checked=?"
367                             " WHERE dircap=?",
368                             (now, dircap))
369         self.connection.commit()