]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage.py
deletion phase3: add a sqlite database to track renew/cancel-lease secrets, implement...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage.py
1 import os, re, weakref, stat, struct, time
2
3 from foolscap import Referenceable
4 from twisted.application import service
5 from twisted.internet import defer
6 from twisted.python import util
7
8 from zope.interface import implements
9 from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
10      RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE
11 from allmydata.util import fileutil, idlib, mathutil
12 from allmydata.util.assertutil import precondition
13
14 from pysqlite2 import dbapi2 as sqlite
15
16 # store/
17 # store/owners.db
18 # store/shares/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/shares/$STORAGEINDEX/$SHARENUM on success
19 # store/shares/$STORAGEINDEX
20 # store/shares/$STORAGEINDEX/$SHARENUM
21 # store/shares/$STORAGEINDEX/$SHARENUM/blocksize
22 # store/shares/$STORAGEINDEX/$SHARENUM/data
23 # store/shares/$STORAGEINDEX/$SHARENUM/blockhashes
24 # store/shares/$STORAGEINDEX/$SHARENUM/sharehashtree
25
26 # $SHARENUM matches this regex:
27 NUM_RE=re.compile("[0-9]*")
28
29 class BucketWriter(Referenceable):
30     implements(RIBucketWriter)
31
32     def __init__(self, ss, incominghome, finalhome, size):
33         self.ss = ss
34         self.incominghome = incominghome
35         self.finalhome = finalhome
36         self._size = size
37         self.closed = False
38         self.throw_out_all_data = False
39         # touch the file, so later callers will see that we're working on it
40         f = open(self.incominghome, 'ab')
41         f.close()
42
43     def allocated_size(self):
44         return self._size
45
46     def remote_write(self, offset, data):
47         precondition(not self.closed)
48         precondition(offset >= 0)
49         precondition(offset+len(data) <= self._size)
50         if self.throw_out_all_data:
51             return
52         f = open(self.incominghome, 'ab')
53         f.seek(offset)
54         f.write(data)
55         f.close()
56
57     def remote_close(self):
58         precondition(not self.closed)
59         fileutil.rename(self.incominghome, self.finalhome)
60         self.closed = True
61         filelen = os.stat(self.finalhome)[stat.ST_SIZE]
62         self.ss.bucket_writer_closed(self, filelen)
63
64
65 class BucketReader(Referenceable):
66     implements(RIBucketReader)
67
68     def __init__(self, home):
69         self.home = home
70
71     def remote_read(self, offset, length):
72         f = open(self.home, 'rb')
73         f.seek(offset)
74         return f.read(length)
75
76 class StorageServer(service.MultiService, Referenceable):
77     implements(RIStorageServer)
78     name = 'storageserver'
79
80     def __init__(self, storedir, sizelimit=None, no_storage=False):
81         service.MultiService.__init__(self)
82         self.storedir = storedir
83         sharedir = os.path.join(storedir, "shares")
84         fileutil.make_dirs(sharedir)
85         self.sharedir = sharedir
86         self.sizelimit = sizelimit
87         self.no_storage = no_storage
88         self.incomingdir = os.path.join(sharedir, 'incoming')
89         self._clean_incomplete()
90         fileutil.make_dirs(self.incomingdir)
91         self._active_writers = weakref.WeakKeyDictionary()
92
93         self.init_db()
94
95         self.measure_size()
96
97     def _clean_incomplete(self):
98         fileutil.rm_dir(self.incomingdir)
99
100     def init_db(self):
101         # files in storedir with non-zbase32 characters in it (like ".") are
102         # safe, in that they cannot be accessed or overwritten by clients
103         # (whose binary storage_index values are always converted into a
104         # filename with idlib.b2a)
105         db_file = os.path.join(self.storedir, "owners.db")
106         need_to_init_db = not os.path.exists(db_file)
107         self._owner_db_con = sqlite.connect(db_file)
108         self._owner_db_cur = self._owner_db_con.cursor()
109         if need_to_init_db:
110             setup_file = util.sibpath(__file__, "owner.sql")
111             setup = open(setup_file, "r").read()
112             self._owner_db_cur.executescript(setup)
113
114     def measure_size(self):
115         self.consumed = fileutil.du(self.sharedir)
116
117     def allocated_size(self):
118         space = self.consumed
119         for bw in self._active_writers:
120             space += bw.allocated_size()
121         return space
122
123     def remote_allocate_buckets(self, storage_index,
124                                 renew_secret, cancel_secret,
125                                 sharenums, allocated_size,
126                                 canary):
127         alreadygot = set()
128         bucketwriters = {} # k: shnum, v: BucketWriter
129         si_s = idlib.b2a(storage_index)
130         space_per_bucket = allocated_size
131         no_limits = self.sizelimit is None
132         yes_limits = not no_limits
133         if yes_limits:
134             remaining_space = self.sizelimit - self.allocated_size()
135         for shnum in sharenums:
136             incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum)
137             finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum)
138             if os.path.exists(incominghome) or os.path.exists(finalhome):
139                 alreadygot.add(shnum)
140             elif no_limits or remaining_space >= space_per_bucket:
141                 fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
142                 bw = BucketWriter(self, incominghome, finalhome,
143                                   space_per_bucket)
144                 if self.no_storage:
145                     bw.throw_out_all_data = True
146                 bucketwriters[shnum] = bw
147                 self._active_writers[bw] = 1
148                 if yes_limits:
149                     remaining_space -= space_per_bucket
150             else:
151                 # not enough space to accept this bucket
152                 pass
153
154         if bucketwriters:
155             fileutil.make_dirs(os.path.join(self.sharedir, si_s))
156
157         # now store the secrets somewhere. This requires a
158         # variable-length-list of (renew,cancel) secret tuples per bucket.
159         # Note that this does not need to be kept inside the share itself, if
160         # packing efficiency is a concern. For this implementation, we use a
161         # sqlite database, which puts everything in a single file.
162         self.add_lease(storage_index, renew_secret, cancel_secret)
163
164         return alreadygot, bucketwriters
165
166     def add_lease(self, storage_index, renew_secret, cancel_secret):
167         # is the bucket already in our database?
168         cur = self._owner_db_cur
169         cur.execute("SELECT bucket_id FROM buckets"
170                     " WHERE storage_index = ?",
171                     (storage_index,))
172         res = cur.fetchone()
173         if res:
174             bucket_id = res[0]
175         else:
176             cur.execute("INSERT INTO buckets (storage_index)"
177                         " values(?)", (storage_index,))
178             cur.execute("SELECT bucket_id FROM buckets"
179                         " WHERE storage_index = ?",
180                         (storage_index,))
181             res = cur.fetchone()
182             bucket_id = res[0]
183
184         # what time will this lease expire? One month from now.
185         expire_time = time.time() + 31*24*60*60
186
187         # now, is this lease already in our database? Since we don't have
188         # owners yet, look for a match by renew_secret/cancel_secret
189         cur.execute("SELECT lease_id FROM leases"
190                     " WHERE renew_secret = ? AND cancel_secret = ?",
191                     (renew_secret, cancel_secret))
192         res = cur.fetchone()
193         if res:
194             # yes, so just update the timestamp
195             lease_id = res[0]
196             cur.execute("UPDATE leases"
197                         " SET expire_time = ?"
198                         " WHERE lease_id = ?",
199                         (expire_time, lease_id))
200         else:
201             # no, we need to add the lease
202             cur.execute("INSERT INTO leases "
203                         "(bucket_id, renew_secret, cancel_secret, expire_time)"
204                         " values(?,?,?,?)",
205                         (bucket_id, renew_secret, cancel_secret, expire_time))
206         self._owner_db_con.commit()
207
208     def remote_renew_lease(self, storage_index, renew_secret):
209         # find the lease
210         cur = self._owner_db_cur
211         cur.execute("SELECT leases.lease_id FROM buckets, leases"
212                     " WHERE buckets.storage_index = ?"
213                     "  AND buckets.bucket_id = leases.bucket_id"
214                     "  AND leases.renew_secret = ?",
215                     (storage_index, renew_secret))
216         res = cur.fetchone()
217         if res:
218             # found it, now update it. The new leases will expire one month
219             # from now.
220             expire_time = time.time() + 31*24*60*60
221             lease_id = res[0]
222             cur.execute("UPDATE leases"
223                         " SET expire_time = ?"
224                         " WHERE lease_id = ?",
225                         (expire_time, lease_id))
226         else:
227             # no such lease
228             raise IndexError("No such lease")
229         self._owner_db_con.commit()
230
231     def remote_cancel_lease(self, storage_index, cancel_secret):
232         # find the lease
233         cur = self._owner_db_cur
234         cur.execute("SELECT l.lease_id, b.storage_index, b.bucket_id"
235                     " FROM buckets b, leases l"
236                     " WHERE b.storage_index = ?"
237                     "  AND b.bucket_id = l.bucket_id"
238                     "  AND l.cancel_secret = ?",
239                     (storage_index, cancel_secret))
240         res = cur.fetchone()
241         if res:
242             # found it
243             lease_id, storage_index, bucket_id = res
244             cur.execute("DELETE FROM leases WHERE lease_id = ?",
245                         (lease_id,))
246             # was that the last one?
247             cur.execute("SELECT COUNT(*) FROM leases WHERE bucket_id = ?",
248                         (bucket_id,))
249             res = cur.fetchone()
250             remaining_leases = res[0]
251             if not remaining_leases:
252                 # delete the share
253                 cur.execute("DELETE FROM buckets WHERE bucket_id = ?",
254                             (bucket_id,))
255                 self.delete_bucket(storage_index)
256         else:
257             # no such lease
258             raise IndexError("No such lease")
259         self._owner_db_con.commit()
260
261     def delete_bucket(self, storage_index):
262         storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
263         # measure the usage of this directory, to remove it from our current
264         # total
265         consumed = fileutil.du(storagedir)
266         fileutil.rm_dir(storagedir)
267         self.consumed -= consumed
268
269     def bucket_writer_closed(self, bw, consumed_size):
270         self.consumed += consumed_size
271         del self._active_writers[bw]
272
273     def remote_get_buckets(self, storage_index):
274         bucketreaders = {} # k: sharenum, v: BucketReader
275         storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
276         try:
277             for f in os.listdir(storagedir):
278                 if NUM_RE.match(f):
279                     br = BucketReader(os.path.join(storagedir, f))
280                     bucketreaders[int(f)] = br
281         except OSError:
282             # Commonly caused by there being no buckets at all.
283             pass
284
285         return bucketreaders
286
287 """
288 Share data is written into a single file. At the start of the file, there is
289 a series of four-byte big-endian offset values, which indicate where each
290 section starts. Each offset is measured from the beginning of the file.
291
292 0x00: segment size
293 0x04: data size
294 0x08: offset of data (=00 00 00 1c)
295 0x0c: offset of plaintext_hash_tree
296 0x10: offset of crypttext_hash_tree
297 0x14: offset of block_hashes
298 0x18: offset of share_hashes
299 0x1c: offset of uri_extension_length + uri_extension
300 0x20: start of data
301 ?   : start of plaintext_hash_tree
302 ?   : start of crypttext_hash_tree
303 ?   : start of block_hashes
304 ?   : start of share_hashes
305        each share_hash is written as a two-byte (big-endian) hashnum
306        followed by the 32-byte SHA-256 hash. We only store the hashes
307        necessary to validate the share hash root
308 ?   : start of uri_extension_length (four-byte big-endian value)
309 ?   : start of uri_extension
310 """
311
312 def allocated_size(data_size, num_segments, num_share_hashes,
313                    uri_extension_size):
314     wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
315                            uri_extension_size)
316     uri_extension_starts_at = wbp._offsets['uri_extension']
317     return uri_extension_starts_at + 4 + uri_extension_size
318
319 class WriteBucketProxy:
320     implements(IStorageBucketWriter)
321     def __init__(self, rref, data_size, segment_size, num_segments,
322                  num_share_hashes, uri_extension_size):
323         self._rref = rref
324         self._data_size = data_size
325         self._segment_size = segment_size
326         self._num_segments = num_segments
327
328         effective_segments = mathutil.next_power_of_k(num_segments,2)
329         self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
330         # how many share hashes are included in each share? This will be
331         # about ln2(num_shares).
332         self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
333         # we commit to not sending a uri extension larger than this
334         self._uri_extension_size = uri_extension_size
335
336         offsets = self._offsets = {}
337         x = 0x20
338         offsets['data'] = x
339         x += data_size
340         offsets['plaintext_hash_tree'] = x
341         x += self._segment_hash_size
342         offsets['crypttext_hash_tree'] = x
343         x += self._segment_hash_size
344         offsets['block_hashes'] = x
345         x += self._segment_hash_size
346         offsets['share_hashes'] = x
347         x += self._share_hash_size
348         offsets['uri_extension'] = x
349
350         offset_data = struct.pack(">LLLLLLLL",
351                                   segment_size,
352                                   data_size,
353                                   offsets['data'],
354                                   offsets['plaintext_hash_tree'],
355                                   offsets['crypttext_hash_tree'],
356                                   offsets['block_hashes'],
357                                   offsets['share_hashes'],
358                                   offsets['uri_extension'],
359                                   )
360         assert len(offset_data) == 8*4
361         self._offset_data = offset_data
362
363     def start(self):
364         return self._write(0, self._offset_data)
365
366     def put_block(self, segmentnum, data):
367         offset = self._offsets['data'] + segmentnum * self._segment_size
368         assert offset + len(data) <= self._offsets['uri_extension']
369         assert isinstance(data, str)
370         if segmentnum < self._num_segments-1:
371             precondition(len(data) == self._segment_size,
372                          len(data), self._segment_size)
373         else:
374             precondition(len(data) == (self._data_size -
375                                        (self._segment_size *
376                                         (self._num_segments - 1))),
377                          len(data), self._segment_size)
378         return self._write(offset, data)
379
380     def put_plaintext_hashes(self, hashes):
381         offset = self._offsets['plaintext_hash_tree']
382         assert isinstance(hashes, list)
383         data = "".join(hashes)
384         precondition(len(data) == self._segment_hash_size,
385                      len(data), self._segment_hash_size)
386         precondition(offset+len(data) <= self._offsets['crypttext_hash_tree'],
387                      offset, len(data), offset+len(data),
388                      self._offsets['crypttext_hash_tree'])
389         return self._write(offset, data)
390
391     def put_crypttext_hashes(self, hashes):
392         offset = self._offsets['crypttext_hash_tree']
393         assert isinstance(hashes, list)
394         data = "".join(hashes)
395         precondition(len(data) == self._segment_hash_size,
396                      len(data), self._segment_hash_size)
397         precondition(offset + len(data) <= self._offsets['block_hashes'],
398                      offset, len(data), offset+len(data),
399                      self._offsets['block_hashes'])
400         return self._write(offset, data)
401
402     def put_block_hashes(self, blockhashes):
403         offset = self._offsets['block_hashes']
404         assert isinstance(blockhashes, list)
405         data = "".join(blockhashes)
406         precondition(len(data) == self._segment_hash_size,
407                      len(data), self._segment_hash_size)
408         precondition(offset + len(data) <= self._offsets['share_hashes'],
409                      offset, len(data), offset+len(data),
410                      self._offsets['share_hashes'])
411         return self._write(offset, data)
412
413     def put_share_hashes(self, sharehashes):
414         # sharehashes is a list of (index, hash) tuples, so they get stored
415         # as 2+32=34 bytes each
416         offset = self._offsets['share_hashes']
417         assert isinstance(sharehashes, list)
418         data = "".join([struct.pack(">H", hashnum) + hashvalue
419                         for hashnum,hashvalue in sharehashes])
420         precondition(len(data) == self._share_hash_size,
421                      len(data), self._share_hash_size)
422         precondition(offset + len(data) <= self._offsets['uri_extension'],
423                      offset, len(data), offset+len(data),
424                      self._offsets['uri_extension'])
425         return self._write(offset, data)
426
427     def put_uri_extension(self, data):
428         offset = self._offsets['uri_extension']
429         assert isinstance(data, str)
430         precondition(len(data) <= self._uri_extension_size,
431                      len(data), self._uri_extension_size)
432         length = struct.pack(">L", len(data))
433         return self._write(offset, length+data)
434
435     def _write(self, offset, data):
436         # TODO: for small shares, buffer the writes and do just a single call
437         return self._rref.callRemote("write", offset, data)
438
439     def close(self):
440         return self._rref.callRemote("close")
441
442 class ReadBucketProxy:
443     implements(IStorageBucketReader)
444     def __init__(self, rref):
445         self._rref = rref
446         self._started = False
447
448     def startIfNecessary(self):
449         if self._started:
450             return defer.succeed(self)
451         d = self.start()
452         d.addCallback(lambda res: self)
453         return d
454
455     def start(self):
456         # TODO: for small shares, read the whole bucket in start()
457         d = self._read(0, 8*4)
458         d.addCallback(self._parse_offsets)
459         return d
460
461     def _parse_offsets(self, data):
462         precondition(len(data) == 8*4)
463         self._offsets = {}
464         self._segment_size = struct.unpack(">L", data[0:4])[0]
465         self._data_size = struct.unpack(">L", data[4:8])[0]
466         x = 0x08
467         for field in ( 'data',
468                        'plaintext_hash_tree',
469                        'crypttext_hash_tree',
470                        'block_hashes',
471                        'share_hashes',
472                        'uri_extension',
473                        ):
474             offset = struct.unpack(">L", data[x:x+4])[0]
475             x += 4
476             self._offsets[field] = offset
477         return self._offsets
478
479     def get_block(self, blocknum):
480         num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
481         if blocknum < num_segments-1:
482             size = self._segment_size
483         else:
484             size = self._data_size % self._segment_size
485             if size == 0:
486                 size = self._segment_size
487         offset = self._offsets['data'] + blocknum * self._segment_size
488         return self._read(offset, size)
489
490     def _str2l(self, s):
491         """ split string (pulled from storage) into a list of blockids """
492         return [ s[i:i+HASH_SIZE]
493                  for i in range(0, len(s), HASH_SIZE) ]
494
495     def get_plaintext_hashes(self):
496         offset = self._offsets['plaintext_hash_tree']
497         size = self._offsets['crypttext_hash_tree'] - offset
498         d = self._read(offset, size)
499         d.addCallback(self._str2l)
500         return d
501
502     def get_crypttext_hashes(self):
503         offset = self._offsets['crypttext_hash_tree']
504         size = self._offsets['block_hashes'] - offset
505         d = self._read(offset, size)
506         d.addCallback(self._str2l)
507         return d
508
509     def get_block_hashes(self):
510         offset = self._offsets['block_hashes']
511         size = self._offsets['share_hashes'] - offset
512         d = self._read(offset, size)
513         d.addCallback(self._str2l)
514         return d
515
516     def get_share_hashes(self):
517         offset = self._offsets['share_hashes']
518         size = self._offsets['uri_extension'] - offset
519         assert size % (2+HASH_SIZE) == 0
520         d = self._read(offset, size)
521         def _unpack_share_hashes(data):
522             assert len(data) == size
523             hashes = []
524             for i in range(0, size, 2+HASH_SIZE):
525                 hashnum = struct.unpack(">H", data[i:i+2])[0]
526                 hashvalue = data[i+2:i+2+HASH_SIZE]
527                 hashes.append( (hashnum, hashvalue) )
528             return hashes
529         d.addCallback(_unpack_share_hashes)
530         return d
531
532     def get_uri_extension(self):
533         offset = self._offsets['uri_extension']
534         d = self._read(offset, 4)
535         def _got_length(data):
536             length = struct.unpack(">L", data)[0]
537             return self._read(offset+4, length)
538         d.addCallback(_got_length)
539         return d
540
541     def _read(self, offset, length):
542         return self._rref.callRemote("read", offset, length)