]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/scripts/backupdb.py
Add get_pathinfo.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / scripts / backupdb.py
1
2 import os.path, sys, time, random, stat
3
4 from allmydata.util.netstring import netstring
5 from allmydata.util.hashutil import backupdb_dirhash
6 from allmydata.util import base32
7 from allmydata.util.fileutil import abspath_expanduser_unicode
8 from allmydata.util.encodingutil import to_str
9 from allmydata.util.dbutil import get_db, DBError
10
11
12 DAY = 24*60*60
13 MONTH = 30*DAY
14
15 SCHEMA_v1 = """
16 CREATE TABLE version -- added in v1
17 (
18  version INTEGER  -- contains one row, set to 2
19 );
20
21 CREATE TABLE local_files -- added in v1
22 (
23  path  VARCHAR(1024) PRIMARY KEY, -- index, this is an absolute UTF-8-encoded local filename
24  size  INTEGER,       -- os.stat(fn)[stat.ST_SIZE]
25  mtime NUMBER,        -- os.stat(fn)[stat.ST_MTIME]
26  ctime NUMBER,        -- os.stat(fn)[stat.ST_CTIME]
27  fileid INTEGER
28 );
29
30 CREATE TABLE caps -- added in v1
31 (
32  fileid INTEGER PRIMARY KEY AUTOINCREMENT,
33  filecap VARCHAR(256) UNIQUE       -- URI:CHK:...
34 );
35
36 CREATE TABLE last_upload -- added in v1
37 (
38  fileid INTEGER PRIMARY KEY,
39  last_uploaded TIMESTAMP,
40  last_checked TIMESTAMP
41 );
42
43 """
44
45 TABLE_DIRECTORY = """
46
47 CREATE TABLE directories -- added in v2
48 (
49  dirhash varchar(256) PRIMARY KEY,  -- base32(dirhash)
50  dircap varchar(256),               -- URI:DIR2-CHK:...
51  last_uploaded TIMESTAMP,
52  last_checked TIMESTAMP
53 );
54
55 """
56
57 SCHEMA_v2 = SCHEMA_v1 + TABLE_DIRECTORY
58
59 UPDATE_v1_to_v2 = TABLE_DIRECTORY + """
60 UPDATE version SET version=2;
61 """
62
63 UPDATERS = {
64     2: UPDATE_v1_to_v2,
65 }
66
67 def get_backupdb(dbfile, stderr=sys.stderr,
68                  create_version=(SCHEMA_v2, 2), just_create=False):
69     # Open or create the given backupdb file. The parent directory must
70     # exist.
71     try:
72         (sqlite3, db) = get_db(dbfile, stderr, create_version, updaters=UPDATERS,
73                                just_create=just_create, dbname="backupdb")
74         return BackupDB_v2(sqlite3, db)
75     except DBError, e:
76         print >>stderr, e
77         return None
78
79
80 class FileResult:
81     def __init__(self, bdb, filecap, should_check,
82                  path, mtime, ctime, size):
83         self.bdb = bdb
84         self.filecap = filecap
85         self.should_check_p = should_check
86
87         self.path = path
88         self.mtime = mtime
89         self.ctime = ctime
90         self.size = size
91
92     def was_uploaded(self):
93         if self.filecap:
94             return self.filecap
95         return False
96
97     def did_upload(self, filecap):
98         self.bdb.did_upload_file(filecap, self.path,
99                                  self.mtime, self.ctime, self.size)
100
101     def should_check(self):
102         return self.should_check_p
103
104     def did_check_healthy(self, results):
105         self.bdb.did_check_file_healthy(self.filecap, results)
106
107
108 class DirectoryResult:
109     def __init__(self, bdb, dirhash, dircap, should_check):
110         self.bdb = bdb
111         self.dircap = dircap
112         self.should_check_p = should_check
113         self.dirhash = dirhash
114
115     def was_created(self):
116         if self.dircap:
117             return self.dircap
118         return False
119
120     def did_create(self, dircap):
121         self.bdb.did_create_directory(dircap, self.dirhash)
122
123     def should_check(self):
124         return self.should_check_p
125
126     def did_check_healthy(self, results):
127         self.bdb.did_check_directory_healthy(self.dircap, results)
128
129
130 class BackupDB_v2:
131     VERSION = 2
132     NO_CHECK_BEFORE = 1*MONTH
133     ALWAYS_CHECK_AFTER = 2*MONTH
134
135     def __init__(self, sqlite_module, connection):
136         self.sqlite_module = sqlite_module
137         self.connection = connection
138         self.cursor = connection.cursor()
139
140     def check_file(self, path, use_timestamps=True):
141         """I will tell you if a given local file needs to be uploaded or not,
142         by looking in a database and seeing if I have a record of this file
143         having been uploaded earlier.
144
145         I return a FileResults object, synchronously. If r.was_uploaded()
146         returns False, you should upload the file. When you are finished
147         uploading it, call r.did_upload(filecap), so I can update my
148         database.
149
150         If was_uploaded() returns a filecap, you might be able to avoid an
151         upload. Call r.should_check(), and if it says False, you can skip the
152         upload and use the filecap returned by was_uploaded().
153
154         If should_check() returns True, you should perform a filecheck on the
155         filecap returned by was_uploaded(). If the check indicates the file
156         is healthy, please call r.did_check_healthy(checker_results) so I can
157         update the database, using the de-JSONized response from the webapi
158         t=check call for 'checker_results'. If the check indicates the file
159         is not healthy, please upload the file and call r.did_upload(filecap)
160         when you're done.
161
162         If use_timestamps=True (the default), I will compare ctime and mtime
163         of the local file against an entry in my database, and consider the
164         file to be unchanged if ctime, mtime, and filesize are all the same
165         as the earlier version. If use_timestamps=False, I will not trust the
166         timestamps, so more files (perhaps all) will be marked as needing
167         upload. A future version of this database may hash the file to make
168         equality decisions, in which case use_timestamps=False will not
169         always imply r.must_upload()==True.
170
171         'path' points to a local file on disk, possibly relative to the
172         current working directory. The database stores absolute pathnames.
173         """
174
175         path = abspath_expanduser_unicode(path)
176
177         # TODO: consider using get_pathinfo.
178         s = os.stat(path)
179         size = s[stat.ST_SIZE]
180         ctime = s[stat.ST_CTIME]
181         mtime = s[stat.ST_MTIME]
182
183         now = time.time()
184         c = self.cursor
185
186         c.execute("SELECT size,mtime,ctime,fileid"
187                   " FROM local_files"
188                   " WHERE path=?",
189                   (path,))
190         row = self.cursor.fetchone()
191         if not row:
192             return FileResult(self, None, False, path, mtime, ctime, size)
193         (last_size,last_mtime,last_ctime,last_fileid) = row
194
195         c.execute("SELECT caps.filecap, last_upload.last_checked"
196                   " FROM caps,last_upload"
197                   " WHERE caps.fileid=? AND last_upload.fileid=?",
198                   (last_fileid, last_fileid))
199         row2 = c.fetchone()
200
201         if ((last_size != size
202              or not use_timestamps
203              or last_mtime != mtime
204              or last_ctime != ctime) # the file has been changed
205             or (not row2) # we somehow forgot where we put the file last time
206             ):
207             c.execute("DELETE FROM local_files WHERE path=?", (path,))
208             self.connection.commit()
209             return FileResult(self, None, False, path, mtime, ctime, size)
210
211         # at this point, we're allowed to assume the file hasn't been changed
212         (filecap, last_checked) = row2
213         age = now - last_checked
214
215         probability = ((age - self.NO_CHECK_BEFORE) /
216                        (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
217         probability = min(max(probability, 0.0), 1.0)
218         should_check = bool(random.random() < probability)
219
220         return FileResult(self, to_str(filecap), should_check,
221                           path, mtime, ctime, size)
222
223     def get_or_allocate_fileid_for_cap(self, filecap):
224         # find an existing fileid for this filecap, or insert a new one. The
225         # caller is required to commit() afterwards.
226
227         # mysql has "INSERT ... ON DUPLICATE KEY UPDATE", but not sqlite
228         # sqlite has "INSERT ON CONFLICT REPLACE", but not mysql
229         # So we use INSERT, ignore any error, then a SELECT
230         c = self.cursor
231         try:
232             c.execute("INSERT INTO caps (filecap) VALUES (?)", (filecap,))
233         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
234             # sqlite3 on sid gives IntegrityError
235             # pysqlite2 (which we don't use, so maybe no longer relevant) on dapper gives OperationalError
236             pass
237         c.execute("SELECT fileid FROM caps WHERE filecap=?", (filecap,))
238         foundrow = c.fetchone()
239         assert foundrow
240         fileid = foundrow[0]
241         return fileid
242
243     def did_upload_file(self, filecap, path, mtime, ctime, size):
244         now = time.time()
245         fileid = self.get_or_allocate_fileid_for_cap(filecap)
246         try:
247             self.cursor.execute("INSERT INTO last_upload VALUES (?,?,?)",
248                                 (fileid, now, now))
249         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
250             self.cursor.execute("UPDATE last_upload"
251                                 " SET last_uploaded=?, last_checked=?"
252                                 " WHERE fileid=?",
253                                 (now, now, fileid))
254         try:
255             self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?)",
256                                 (path, size, mtime, ctime, fileid))
257         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
258             self.cursor.execute("UPDATE local_files"
259                                 " SET size=?, mtime=?, ctime=?, fileid=?"
260                                 " WHERE path=?",
261                                 (size, mtime, ctime, fileid, path))
262         self.connection.commit()
263
264     def did_check_file_healthy(self, filecap, results):
265         now = time.time()
266         fileid = self.get_or_allocate_fileid_for_cap(filecap)
267         self.cursor.execute("UPDATE last_upload"
268                             " SET last_checked=?"
269                             " WHERE fileid=?",
270                             (now, fileid))
271         self.connection.commit()
272
273     def check_directory(self, contents):
274         """I will tell you if a new directory needs to be created for a given
275         set of directory contents, or if I know of an existing (immutable)
276         directory that can be used instead.
277
278         'contents' should be a dictionary that maps from child name (a single
279         unicode string) to immutable childcap (filecap or dircap).
280
281         I return a DirectoryResult object, synchronously. If r.was_created()
282         returns False, you should create the directory (with
283         t=mkdir-immutable). When you are finished, call r.did_create(dircap)
284         so I can update my database.
285
286         If was_created() returns a dircap, you might be able to avoid the
287         mkdir. Call r.should_check(), and if it says False, you can skip the
288         mkdir and use the dircap returned by was_created().
289
290         If should_check() returns True, you should perform a check operation
291         on the dircap returned by was_created(). If the check indicates the
292         directory is healthy, please call
293         r.did_check_healthy(checker_results) so I can update the database,
294         using the de-JSONized response from the webapi t=check call for
295         'checker_results'. If the check indicates the directory is not
296         healthy, please repair or re-create the directory and call
297         r.did_create(dircap) when you're done.
298         """
299
300         now = time.time()
301         entries = []
302         for name in contents:
303             entries.append( [name.encode("utf-8"), contents[name]] )
304         entries.sort()
305         data = "".join([netstring(name_utf8)+netstring(cap)
306                         for (name_utf8,cap) in entries])
307         dirhash = backupdb_dirhash(data)
308         dirhash_s = base32.b2a(dirhash)
309         c = self.cursor
310         c.execute("SELECT dircap, last_checked"
311                   " FROM directories WHERE dirhash=?", (dirhash_s,))
312         row = c.fetchone()
313         if not row:
314             return DirectoryResult(self, dirhash_s, None, False)
315         (dircap, last_checked) = row
316         age = now - last_checked
317
318         probability = ((age - self.NO_CHECK_BEFORE) /
319                        (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
320         probability = min(max(probability, 0.0), 1.0)
321         should_check = bool(random.random() < probability)
322
323         return DirectoryResult(self, dirhash_s, to_str(dircap), should_check)
324
325     def did_create_directory(self, dircap, dirhash):
326         now = time.time()
327         # if the dirhash is already present (i.e. we've re-uploaded an
328         # existing directory, possibly replacing the dircap with a new one),
329         # update the record in place. Otherwise create a new record.)
330         self.cursor.execute("REPLACE INTO directories VALUES (?,?,?,?)",
331                             (dirhash, dircap, now, now))
332         self.connection.commit()
333
334     def did_check_directory_healthy(self, dircap, results):
335         now = time.time()
336         self.cursor.execute("UPDATE directories"
337                             " SET last_checked=?"
338                             " WHERE dircap=?",
339                             (now, dircap))
340         self.connection.commit()