1 import os, re, weakref, stat, struct, time
3 from foolscap import Referenceable
4 from twisted.application import service
5 from twisted.internet import defer
6 from twisted.python import util
8 from zope.interface import implements
9 from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
10 RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE
11 from allmydata.util import fileutil, idlib, mathutil
12 from allmydata.util.assertutil import precondition
14 from pysqlite2 import dbapi2 as sqlite
18 # store/shares/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/shares/$STORAGEINDEX/$SHARENUM on success
19 # store/shares/$STORAGEINDEX
20 # store/shares/$STORAGEINDEX/$SHARENUM
21 # store/shares/$STORAGEINDEX/$SHARENUM/blocksize
22 # store/shares/$STORAGEINDEX/$SHARENUM/data
23 # store/shares/$STORAGEINDEX/$SHARENUM/blockhashes
24 # store/shares/$STORAGEINDEX/$SHARENUM/sharehashtree
26 # $SHARENUM matches this regex:
27 NUM_RE=re.compile("[0-9]*")
29 class BucketWriter(Referenceable):
30 implements(RIBucketWriter)
32 def __init__(self, ss, incominghome, finalhome, size):
34 self.incominghome = incominghome
35 self.finalhome = finalhome
38 self.throw_out_all_data = False
39 # touch the file, so later callers will see that we're working on it
40 f = open(self.incominghome, 'ab')
43 def allocated_size(self):
46 def remote_write(self, offset, data):
47 precondition(not self.closed)
48 precondition(offset >= 0)
49 precondition(offset+len(data) <= self._size)
50 if self.throw_out_all_data:
52 f = open(self.incominghome, 'ab')
57 def remote_close(self):
58 precondition(not self.closed)
59 fileutil.rename(self.incominghome, self.finalhome)
61 filelen = os.stat(self.finalhome)[stat.ST_SIZE]
62 self.ss.bucket_writer_closed(self, filelen)
65 class BucketReader(Referenceable):
66 implements(RIBucketReader)
68 def __init__(self, home):
71 def remote_read(self, offset, length):
72 f = open(self.home, 'rb')
76 class StorageServer(service.MultiService, Referenceable):
77 implements(RIStorageServer)
78 name = 'storageserver'
80 def __init__(self, storedir, sizelimit=None, no_storage=False):
81 service.MultiService.__init__(self)
82 self.storedir = storedir
83 sharedir = os.path.join(storedir, "shares")
84 fileutil.make_dirs(sharedir)
85 self.sharedir = sharedir
86 self.sizelimit = sizelimit
87 self.no_storage = no_storage
88 self.incomingdir = os.path.join(sharedir, 'incoming')
89 self._clean_incomplete()
90 fileutil.make_dirs(self.incomingdir)
91 self._active_writers = weakref.WeakKeyDictionary()
97 def _clean_incomplete(self):
98 fileutil.rm_dir(self.incomingdir)
101 # files in storedir with non-zbase32 characters in it (like ".") are
102 # safe, in that they cannot be accessed or overwritten by clients
103 # (whose binary storage_index values are always converted into a
104 # filename with idlib.b2a)
105 db_file = os.path.join(self.storedir, "owners.db")
106 need_to_init_db = not os.path.exists(db_file)
107 self._owner_db_con = sqlite.connect(db_file)
108 self._owner_db_cur = self._owner_db_con.cursor()
110 setup_file = util.sibpath(__file__, "owner.sql")
111 setup = open(setup_file, "r").read()
112 self._owner_db_cur.executescript(setup)
114 def measure_size(self):
115 self.consumed = fileutil.du(self.sharedir)
117 def allocated_size(self):
118 space = self.consumed
119 for bw in self._active_writers:
120 space += bw.allocated_size()
123 def remote_allocate_buckets(self, storage_index,
124 renew_secret, cancel_secret,
125 sharenums, allocated_size,
128 bucketwriters = {} # k: shnum, v: BucketWriter
129 si_s = idlib.b2a(storage_index)
130 space_per_bucket = allocated_size
131 no_limits = self.sizelimit is None
132 yes_limits = not no_limits
134 remaining_space = self.sizelimit - self.allocated_size()
135 for shnum in sharenums:
136 incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum)
137 finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum)
138 if os.path.exists(incominghome) or os.path.exists(finalhome):
139 alreadygot.add(shnum)
140 elif no_limits or remaining_space >= space_per_bucket:
141 fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
142 bw = BucketWriter(self, incominghome, finalhome,
145 bw.throw_out_all_data = True
146 bucketwriters[shnum] = bw
147 self._active_writers[bw] = 1
149 remaining_space -= space_per_bucket
151 # not enough space to accept this bucket
155 fileutil.make_dirs(os.path.join(self.sharedir, si_s))
157 # now store the secrets somewhere. This requires a
158 # variable-length-list of (renew,cancel) secret tuples per bucket.
159 # Note that this does not need to be kept inside the share itself, if
160 # packing efficiency is a concern. For this implementation, we use a
161 # sqlite database, which puts everything in a single file.
162 self.add_lease(storage_index, renew_secret, cancel_secret)
164 return alreadygot, bucketwriters
166 def add_lease(self, storage_index, renew_secret, cancel_secret):
167 # is the bucket already in our database?
168 cur = self._owner_db_cur
169 cur.execute("SELECT bucket_id FROM buckets"
170 " WHERE storage_index = ?",
176 cur.execute("INSERT INTO buckets (storage_index)"
177 " values(?)", (storage_index,))
178 cur.execute("SELECT bucket_id FROM buckets"
179 " WHERE storage_index = ?",
184 # what time will this lease expire? One month from now.
185 expire_time = time.time() + 31*24*60*60
187 # now, is this lease already in our database? Since we don't have
188 # owners yet, look for a match by renew_secret/cancel_secret
189 cur.execute("SELECT lease_id FROM leases"
190 " WHERE renew_secret = ? AND cancel_secret = ?",
191 (renew_secret, cancel_secret))
194 # yes, so just update the timestamp
196 cur.execute("UPDATE leases"
197 " SET expire_time = ?"
198 " WHERE lease_id = ?",
199 (expire_time, lease_id))
201 # no, we need to add the lease
202 cur.execute("INSERT INTO leases "
203 "(bucket_id, renew_secret, cancel_secret, expire_time)"
205 (bucket_id, renew_secret, cancel_secret, expire_time))
206 self._owner_db_con.commit()
208 def remote_renew_lease(self, storage_index, renew_secret):
210 cur = self._owner_db_cur
211 cur.execute("SELECT leases.lease_id FROM buckets, leases"
212 " WHERE buckets.storage_index = ?"
213 " AND buckets.bucket_id = leases.bucket_id"
214 " AND leases.renew_secret = ?",
215 (storage_index, renew_secret))
218 # found it, now update it. The new leases will expire one month
220 expire_time = time.time() + 31*24*60*60
222 cur.execute("UPDATE leases"
223 " SET expire_time = ?"
224 " WHERE lease_id = ?",
225 (expire_time, lease_id))
228 raise IndexError("No such lease")
229 self._owner_db_con.commit()
231 def remote_cancel_lease(self, storage_index, cancel_secret):
233 cur = self._owner_db_cur
234 cur.execute("SELECT l.lease_id, b.storage_index, b.bucket_id"
235 " FROM buckets b, leases l"
236 " WHERE b.storage_index = ?"
237 " AND b.bucket_id = l.bucket_id"
238 " AND l.cancel_secret = ?",
239 (storage_index, cancel_secret))
243 lease_id, storage_index, bucket_id = res
244 cur.execute("DELETE FROM leases WHERE lease_id = ?",
246 # was that the last one?
247 cur.execute("SELECT COUNT(*) FROM leases WHERE bucket_id = ?",
250 remaining_leases = res[0]
251 if not remaining_leases:
253 cur.execute("DELETE FROM buckets WHERE bucket_id = ?",
255 self.delete_bucket(storage_index)
258 raise IndexError("No such lease")
259 self._owner_db_con.commit()
261 def delete_bucket(self, storage_index):
262 storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
263 # measure the usage of this directory, to remove it from our current
265 consumed = fileutil.du(storagedir)
266 fileutil.rm_dir(storagedir)
267 self.consumed -= consumed
269 def bucket_writer_closed(self, bw, consumed_size):
270 self.consumed += consumed_size
271 del self._active_writers[bw]
273 def remote_get_buckets(self, storage_index):
274 bucketreaders = {} # k: sharenum, v: BucketReader
275 storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
277 for f in os.listdir(storagedir):
279 br = BucketReader(os.path.join(storagedir, f))
280 bucketreaders[int(f)] = br
282 # Commonly caused by there being no buckets at all.
288 Share data is written into a single file. At the start of the file, there is
289 a series of four-byte big-endian offset values, which indicate where each
290 section starts. Each offset is measured from the beginning of the file.
294 0x08: offset of data (=00 00 00 1c)
295 0x0c: offset of plaintext_hash_tree
296 0x10: offset of crypttext_hash_tree
297 0x14: offset of block_hashes
298 0x18: offset of share_hashes
299 0x1c: offset of uri_extension_length + uri_extension
301 ? : start of plaintext_hash_tree
302 ? : start of crypttext_hash_tree
303 ? : start of block_hashes
304 ? : start of share_hashes
305 each share_hash is written as a two-byte (big-endian) hashnum
306 followed by the 32-byte SHA-256 hash. We only store the hashes
307 necessary to validate the share hash root
308 ? : start of uri_extension_length (four-byte big-endian value)
309 ? : start of uri_extension
312 def allocated_size(data_size, num_segments, num_share_hashes,
314 wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
316 uri_extension_starts_at = wbp._offsets['uri_extension']
317 return uri_extension_starts_at + 4 + uri_extension_size
319 class WriteBucketProxy:
320 implements(IStorageBucketWriter)
321 def __init__(self, rref, data_size, segment_size, num_segments,
322 num_share_hashes, uri_extension_size):
324 self._data_size = data_size
325 self._segment_size = segment_size
326 self._num_segments = num_segments
328 effective_segments = mathutil.next_power_of_k(num_segments,2)
329 self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
330 # how many share hashes are included in each share? This will be
331 # about ln2(num_shares).
332 self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
333 # we commit to not sending a uri extension larger than this
334 self._uri_extension_size = uri_extension_size
336 offsets = self._offsets = {}
340 offsets['plaintext_hash_tree'] = x
341 x += self._segment_hash_size
342 offsets['crypttext_hash_tree'] = x
343 x += self._segment_hash_size
344 offsets['block_hashes'] = x
345 x += self._segment_hash_size
346 offsets['share_hashes'] = x
347 x += self._share_hash_size
348 offsets['uri_extension'] = x
350 offset_data = struct.pack(">LLLLLLLL",
354 offsets['plaintext_hash_tree'],
355 offsets['crypttext_hash_tree'],
356 offsets['block_hashes'],
357 offsets['share_hashes'],
358 offsets['uri_extension'],
360 assert len(offset_data) == 8*4
361 self._offset_data = offset_data
364 return self._write(0, self._offset_data)
366 def put_block(self, segmentnum, data):
367 offset = self._offsets['data'] + segmentnum * self._segment_size
368 assert offset + len(data) <= self._offsets['uri_extension']
369 assert isinstance(data, str)
370 if segmentnum < self._num_segments-1:
371 precondition(len(data) == self._segment_size,
372 len(data), self._segment_size)
374 precondition(len(data) == (self._data_size -
375 (self._segment_size *
376 (self._num_segments - 1))),
377 len(data), self._segment_size)
378 return self._write(offset, data)
380 def put_plaintext_hashes(self, hashes):
381 offset = self._offsets['plaintext_hash_tree']
382 assert isinstance(hashes, list)
383 data = "".join(hashes)
384 precondition(len(data) == self._segment_hash_size,
385 len(data), self._segment_hash_size)
386 precondition(offset+len(data) <= self._offsets['crypttext_hash_tree'],
387 offset, len(data), offset+len(data),
388 self._offsets['crypttext_hash_tree'])
389 return self._write(offset, data)
391 def put_crypttext_hashes(self, hashes):
392 offset = self._offsets['crypttext_hash_tree']
393 assert isinstance(hashes, list)
394 data = "".join(hashes)
395 precondition(len(data) == self._segment_hash_size,
396 len(data), self._segment_hash_size)
397 precondition(offset + len(data) <= self._offsets['block_hashes'],
398 offset, len(data), offset+len(data),
399 self._offsets['block_hashes'])
400 return self._write(offset, data)
402 def put_block_hashes(self, blockhashes):
403 offset = self._offsets['block_hashes']
404 assert isinstance(blockhashes, list)
405 data = "".join(blockhashes)
406 precondition(len(data) == self._segment_hash_size,
407 len(data), self._segment_hash_size)
408 precondition(offset + len(data) <= self._offsets['share_hashes'],
409 offset, len(data), offset+len(data),
410 self._offsets['share_hashes'])
411 return self._write(offset, data)
413 def put_share_hashes(self, sharehashes):
414 # sharehashes is a list of (index, hash) tuples, so they get stored
415 # as 2+32=34 bytes each
416 offset = self._offsets['share_hashes']
417 assert isinstance(sharehashes, list)
418 data = "".join([struct.pack(">H", hashnum) + hashvalue
419 for hashnum,hashvalue in sharehashes])
420 precondition(len(data) == self._share_hash_size,
421 len(data), self._share_hash_size)
422 precondition(offset + len(data) <= self._offsets['uri_extension'],
423 offset, len(data), offset+len(data),
424 self._offsets['uri_extension'])
425 return self._write(offset, data)
427 def put_uri_extension(self, data):
428 offset = self._offsets['uri_extension']
429 assert isinstance(data, str)
430 precondition(len(data) <= self._uri_extension_size,
431 len(data), self._uri_extension_size)
432 length = struct.pack(">L", len(data))
433 return self._write(offset, length+data)
435 def _write(self, offset, data):
436 # TODO: for small shares, buffer the writes and do just a single call
437 return self._rref.callRemote("write", offset, data)
440 return self._rref.callRemote("close")
442 class ReadBucketProxy:
443 implements(IStorageBucketReader)
444 def __init__(self, rref):
446 self._started = False
448 def startIfNecessary(self):
450 return defer.succeed(self)
452 d.addCallback(lambda res: self)
456 # TODO: for small shares, read the whole bucket in start()
457 d = self._read(0, 8*4)
458 d.addCallback(self._parse_offsets)
461 def _parse_offsets(self, data):
462 precondition(len(data) == 8*4)
464 self._segment_size = struct.unpack(">L", data[0:4])[0]
465 self._data_size = struct.unpack(">L", data[4:8])[0]
467 for field in ( 'data',
468 'plaintext_hash_tree',
469 'crypttext_hash_tree',
474 offset = struct.unpack(">L", data[x:x+4])[0]
476 self._offsets[field] = offset
479 def get_block(self, blocknum):
480 num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
481 if blocknum < num_segments-1:
482 size = self._segment_size
484 size = self._data_size % self._segment_size
486 size = self._segment_size
487 offset = self._offsets['data'] + blocknum * self._segment_size
488 return self._read(offset, size)
491 """ split string (pulled from storage) into a list of blockids """
492 return [ s[i:i+HASH_SIZE]
493 for i in range(0, len(s), HASH_SIZE) ]
495 def get_plaintext_hashes(self):
496 offset = self._offsets['plaintext_hash_tree']
497 size = self._offsets['crypttext_hash_tree'] - offset
498 d = self._read(offset, size)
499 d.addCallback(self._str2l)
502 def get_crypttext_hashes(self):
503 offset = self._offsets['crypttext_hash_tree']
504 size = self._offsets['block_hashes'] - offset
505 d = self._read(offset, size)
506 d.addCallback(self._str2l)
509 def get_block_hashes(self):
510 offset = self._offsets['block_hashes']
511 size = self._offsets['share_hashes'] - offset
512 d = self._read(offset, size)
513 d.addCallback(self._str2l)
516 def get_share_hashes(self):
517 offset = self._offsets['share_hashes']
518 size = self._offsets['uri_extension'] - offset
519 assert size % (2+HASH_SIZE) == 0
520 d = self._read(offset, size)
521 def _unpack_share_hashes(data):
522 assert len(data) == size
524 for i in range(0, size, 2+HASH_SIZE):
525 hashnum = struct.unpack(">H", data[i:i+2])[0]
526 hashvalue = data[i+2:i+2+HASH_SIZE]
527 hashes.append( (hashnum, hashvalue) )
529 d.addCallback(_unpack_share_hashes)
532 def get_uri_extension(self):
533 offset = self._offsets['uri_extension']
534 d = self._read(offset, 4)
535 def _got_length(data):
536 length = struct.unpack(">L", data)[0]
537 return self._read(offset+4, length)
538 d.addCallback(_got_length)
541 def _read(self, offset, length):
542 return self._rref.callRemote("read", offset, length)