1 import os, re, weakref, stat, struct, time
2 from itertools import chain
4 from foolscap import Referenceable
5 from twisted.application import service
6 from twisted.internet import defer
8 from zope.interface import implements
9 from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
10 RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \
12 from allmydata.util import fileutil, idlib, mathutil
13 from allmydata.util.assertutil import precondition, _assert
15 class DataTooLargeError(Exception):
19 # storage/shares/incoming
20 # incoming/ holds temp dirs named $STORAGEINDEX/$SHARENUM which will be
21 # moved to storage/shares/$STORAGEINDEX/$SHARENUM upon success
22 # storage/shares/$STORAGEINDEX
23 # storage/shares/$STORAGEINDEX/$SHARENUM
25 # $SHARENUM matches this regex:
26 NUM_RE=re.compile("^[0-9]+$")
28 # each share file (in storage/shares/$SI/$SHNUM) contains lease information
29 # and share data. The share data is accessed by RIBucketWriter.write and
30 # RIBucketReader.read . The lease information is not accessible through these
33 # The share file has the following layout:
34 # 0x00: share file version number, four bytes, current version is 1
35 # 0x04: share data length, four bytes big-endian = A
36 # 0x08: number of leases, four bytes big-endian
37 # 0x0c: beginning of share data (described below, at WriteBucketProxy)
38 # A+0x0c = B: first lease. Lease format is:
39 # B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
40 # B+0x04: renew secret, 32 bytes (SHA256)
41 # B+0x24: cancel secret, 32 bytes (SHA256)
42 # B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
43 # B+0x48: next lease, or end of record
46 LEASE_SIZE = struct.calcsize(">L32s32sL")
48 def __init__(self, filename):
50 f = open(self.home, 'rb')
51 (version, size, num_leases) = struct.unpack(">LLL", f.read(0xc))
54 self._num_leases = num_leases
55 self._data_offset = 0xc
56 self._lease_offset = 0xc + self._size
58 def read_share_data(self, offset, length):
59 precondition(offset >= 0)
60 precondition(offset+length <= self._size)
61 f = open(self.home, 'rb')
62 f.seek(self._data_offset+offset)
65 def write_share_data(self, offset, data):
67 precondition(offset >= 0)
68 precondition(offset+length <= self._size)
69 f = open(self.home, 'rb+')
70 real_offset = self._data_offset+offset
72 assert f.tell() == real_offset
76 def _write_lease_record(self, f, lease_number, lease_info):
77 (owner_num, renew_secret, cancel_secret, expiration_time) = lease_info
78 offset = self._lease_offset + lease_number * self.LEASE_SIZE
80 assert f.tell() == offset
81 f.write(struct.pack(">L32s32sL",
82 owner_num, renew_secret, cancel_secret,
83 int(expiration_time)))
85 def _read_num_leases(self, f):
87 (num_leases,) = struct.unpack(">L", f.read(4))
90 def _write_num_leases(self, f, num_leases):
92 f.write(struct.pack(">L", num_leases))
94 def _truncate_leases(self, f, num_leases):
95 f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
97 def iter_leases(self):
98 """Yields (ownernum, renew_secret, cancel_secret, expiration_time)
100 f = open(self.home, 'rb')
101 (version, size, num_leases) = struct.unpack(">LLL", f.read(0xc))
102 f.seek(self._lease_offset)
103 for i in range(num_leases):
104 data = f.read(self.LEASE_SIZE)
106 yield struct.unpack(">L32s32sL", data)
108 def add_lease(self, lease_info):
109 f = open(self.home, 'rb+')
110 num_leases = self._read_num_leases(f)
111 self._write_lease_record(f, num_leases, lease_info)
112 self._write_num_leases(f, num_leases+1)
115 def renew_lease(self, renew_secret, new_expire_time):
116 for i,(on,rs,cs,et) in enumerate(self.iter_leases()):
117 if rs == renew_secret:
118 # yup. See if we need to update the owner time.
119 if new_expire_time > et:
121 new_lease = (on,rs,cs,new_expire_time)
122 f = open(self.home, 'rb+')
123 self._write_lease_record(f, i, new_lease)
126 raise IndexError("unable to renew non-existent lease")
128 def add_or_renew_lease(self, lease_info):
129 owner_num, renew_secret, cancel_secret, expire_time = lease_info
131 self.renew_lease(renew_secret, expire_time)
133 self.add_lease(lease_info)
135 def cancel_lease(self, cancel_secret):
136 """Remove a lease with the given cancel_secret. Return
137 (num_remaining_leases, space_freed). Raise IndexError if there was no
138 lease with the given cancel_secret."""
140 leases = list(self.iter_leases())
141 num_leases = len(leases)
142 num_leases_removed = 0
143 for i,lease_info in enumerate(leases[:]):
144 (on,rs,cs,et) = lease_info
145 if cs == cancel_secret:
147 num_leases_removed += 1
148 if not num_leases_removed:
149 raise IndexError("unable to find matching lease to cancel")
150 if num_leases_removed:
151 # pack and write out the remaining leases. We write these out in
152 # the same order as they were added, so that if we crash while
153 # doing this, we won't lose any non-cancelled leases.
154 leases = [l for l in leases if l] # remove the cancelled leases
155 f = open(self.home, 'rb+')
156 for i,lease in enumerate(leases):
157 self._write_lease_record(f, i, lease)
158 self._write_num_leases(f, len(leases))
159 self._truncate_leases(f, len(leases))
161 return len(leases), self.LEASE_SIZE * num_leases_removed
164 class BucketWriter(Referenceable):
165 implements(RIBucketWriter)
167 def __init__(self, ss, incominghome, finalhome, size, lease_info):
169 self.incominghome = incominghome
170 self.finalhome = finalhome
173 self.throw_out_all_data = False
174 # touch the file, so later callers will see that we're working on it.
175 # Also construct the metadata.
176 assert not os.path.exists(self.incominghome)
177 f = open(self.incominghome, 'wb')
178 f.write(struct.pack(">LLL", 1, size, 0))
180 self._sharefile = ShareFile(self.incominghome)
181 # also, add our lease to the file now, so that other ones can be
182 # added by simultaneous uploaders
183 self._sharefile.add_lease(lease_info)
185 def allocated_size(self):
188 def remote_write(self, offset, data):
189 precondition(not self.closed)
190 if self.throw_out_all_data:
192 self._sharefile.write_share_data(offset, data)
194 def remote_close(self):
195 precondition(not self.closed)
196 fileutil.rename(self.incominghome, self.finalhome)
197 self._sharefile = None
200 filelen = os.stat(self.finalhome)[stat.ST_SIZE]
201 self.ss.bucket_writer_closed(self, filelen)
203 # if we were the last share to be moved, remove the incoming/
204 # directory that was our parent
205 parentdir = os.path.split(self.incominghome)[0]
206 if not os.listdir(parentdir):
210 class BucketReader(Referenceable):
211 implements(RIBucketReader)
213 def __init__(self, home):
214 self._share_file = ShareFile(home)
216 def remote_read(self, offset, length):
217 return self._share_file.read_share_data(offset, length)
220 # the MutableShareFile is like the ShareFile, but used for mutable data. It
221 # has a different layout. See docs/mutable.txt for more details.
224 # 1 0 32 magic verstr "tahoe mutable container v1" plus binary
225 # 2 32 32 write enabler's nodeid
226 # 3 64 32 write enabler
227 # 4 96 8 data size (actual share data present) (a)
228 # 5 104 8 offset of (8) count of extra leases (after data)
229 # 6 112 416 four leases, 104 bytes each
230 # 0 4 ownerid (0 means "no lease here")
231 # 4 4 expiration timestamp
234 # 72 32 nodeid which accepted the tokens
236 # 8 ?? 4 count of extra leases
237 # 9 ?? n*104 extra leases
240 assert struct.calcsize("L"), 4
241 assert struct.calcsize("Q"), 8
243 class MutableShareFile:
245 DATA_LENGTH_OFFSET = struct.calcsize(">32s32s32s")
246 EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
247 HEADER_SIZE = struct.calcsize(">32s32s32sQQ") # doesn't include leases
248 LEASE_SIZE = struct.calcsize(">LL32s32s32s")
249 assert LEASE_SIZE == 104
250 DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
251 assert DATA_OFFSET == 528, DATA_OFFSET
252 # our sharefiles share with a recognizable string, plus some random
253 # binary data to reduce the chance that a regular text file will look
255 MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
256 assert len(MAGIC) == 32
257 MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
258 # TODO: decide upon a policy for max share size
260 def __init__(self, filename):
262 if os.path.exists(self.home):
263 # we don't cache anything, just check the magic
264 f = open(self.home, 'rb')
265 data = f.read(self.HEADER_SIZE)
267 write_enabler_nodeid, write_enabler,
268 data_length, extra_least_offset) = \
269 struct.unpack(">32s32s32sQQ", data)
270 assert magic == self.MAGIC
273 def create(self, my_nodeid, write_enabler):
274 assert not os.path.exists(self.home)
276 extra_lease_offset = (self.HEADER_SIZE
277 + 4 * self.LEASE_SIZE
279 assert extra_lease_offset == self.DATA_OFFSET # true at creation
281 f = open(self.home, 'wb')
282 header = struct.pack(">32s32s32sQQ",
283 self.MAGIC, my_nodeid, write_enabler,
284 data_length, extra_lease_offset,
286 leases = ("\x00"*self.LEASE_SIZE) * 4
287 f.write(header + leases)
288 # data goes here, empty after creation
289 f.write(struct.pack(">L", num_extra_leases))
290 # extra leases go here, none at creation
293 def _read_data_length(self, f):
294 f.seek(self.DATA_LENGTH_OFFSET)
295 (data_length,) = struct.unpack(">Q", f.read(8))
298 def _write_data_length(self, f, data_length):
299 f.seek(self.DATA_LENGTH_OFFSET)
300 f.write(struct.pack(">Q", data_length))
302 def _read_share_data(self, f, offset, length):
303 precondition(offset >= 0)
304 data_length = self._read_data_length(f)
305 if offset+length > data_length:
306 # reads beyond the end of the data are truncated. Reads that
307 # start beyond the end of the data return an empty string.
308 length = max(0, data_length-offset)
311 precondition(offset+length <= data_length)
312 f.seek(self.DATA_OFFSET+offset)
313 data = f.read(length)
316 def _read_extra_lease_offset(self, f):
317 f.seek(self.EXTRA_LEASE_OFFSET)
318 (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
319 return extra_lease_offset
321 def _write_extra_lease_offset(self, f, offset):
322 f.seek(self.EXTRA_LEASE_OFFSET)
323 f.write(struct.pack(">Q", offset))
325 def _read_num_extra_leases(self, f):
326 offset = self._read_extra_lease_offset(f)
328 (num_extra_leases,) = struct.unpack(">L", f.read(4))
329 return num_extra_leases
331 def _write_num_extra_leases(self, f, num_leases):
332 extra_lease_offset = self._read_extra_lease_offset(f)
333 f.seek(extra_lease_offset)
334 f.write(struct.pack(">L", num_leases))
336 def _change_container_size(self, f, new_container_size):
337 if new_container_size > self.MAX_SIZE:
338 raise DataTooLargeError()
339 old_extra_lease_offset = self._read_extra_lease_offset(f)
340 new_extra_lease_offset = self.DATA_OFFSET + new_container_size
341 if new_extra_lease_offset < old_extra_lease_offset:
342 # TODO: allow containers to shrink. For now they remain large.
344 num_extra_leases = self._read_num_extra_leases(f)
345 f.seek(old_extra_lease_offset)
346 extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
347 f.seek(new_extra_lease_offset)
348 f.write(extra_lease_data)
349 # an interrupt here will corrupt the leases, iff the move caused the
350 # extra leases to overlap.
351 self._write_extra_lease_offset(f, new_extra_lease_offset)
353 def _write_share_data(self, f, offset, data):
355 precondition(offset >= 0)
356 data_length = self._read_data_length(f)
357 extra_lease_offset = self._read_extra_lease_offset(f)
359 if offset+length >= data_length:
360 # They are expanding their data size.
361 if self.DATA_OFFSET+offset+length > extra_lease_offset:
362 # Their new data won't fit in the current container, so we
363 # have to move the leases. With luck, they're expanding it
364 # more than the size of the extra lease block, which will
365 # minimize the corrupt-the-share window
366 self._change_container_size(f, offset+length)
367 extra_lease_offset = self._read_extra_lease_offset(f)
369 # an interrupt here is ok.. the container has been enlarged
370 # but the data remains untouched
372 assert self.DATA_OFFSET+offset+length <= extra_lease_offset
373 # Their data now fits in the current container. We must write
374 # their new data and modify the recorded data size.
375 new_data_length = offset+length
376 self._write_data_length(f, new_data_length)
377 # an interrupt here will result in a corrupted share
379 # now all that's left to do is write out their data
380 f.seek(self.DATA_OFFSET+offset)
384 def _write_lease_record(self, f, lease_number, lease_info):
385 (ownerid, expiration_time,
386 renew_secret, cancel_secret, nodeid) = lease_info
387 extra_lease_offset = self._read_extra_lease_offset(f)
388 num_extra_leases = self._read_num_extra_leases(f)
390 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
391 elif (lease_number-4) < num_extra_leases:
392 offset = (extra_lease_offset
394 + (lease_number-4)*self.LEASE_SIZE)
396 # must add an extra lease record
397 self._write_num_extra_leases(f, num_extra_leases+1)
398 offset = (extra_lease_offset
400 + (lease_number-4)*self.LEASE_SIZE)
402 assert f.tell() == offset
403 f.write(struct.pack(">LL32s32s32s",
404 ownerid, int(expiration_time),
405 renew_secret, cancel_secret, nodeid))
407 def _read_lease_record(self, f, lease_number):
408 # returns a 5-tuple of lease info, or None
409 extra_lease_offset = self._read_extra_lease_offset(f)
410 num_extra_leases = self._read_num_extra_leases(f)
412 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
413 elif (lease_number-4) < num_extra_leases:
414 offset = (extra_lease_offset
416 + (lease_number-4)*self.LEASE_SIZE)
418 raise IndexError("No such lease number %d" % lease_number)
420 assert f.tell() == offset
421 data = f.read(self.LEASE_SIZE)
422 lease_info = struct.unpack(">LL32s32s32s", data)
423 (ownerid, expiration_time,
424 renew_secret, cancel_secret, nodeid) = lease_info
429 def _get_num_lease_slots(self, f):
430 # how many places do we have allocated for leases? Not all of them
432 num_extra_leases = self._read_num_extra_leases(f)
433 return 4+num_extra_leases
435 def _get_first_empty_lease_slot(self, f):
436 # return an int with the index of an empty slot, or None if we do not
437 # currently have an empty slot
439 for i in range(self._get_num_lease_slots(f)):
440 if self._read_lease_record(f, i) is None:
444 def _enumerate_leases(self, f):
445 """Yields (leasenum, (ownerid, expiration_time, renew_secret,
446 cancel_secret, accepting_nodeid)) for all leases."""
447 for i in range(self._get_num_lease_slots(f)):
449 data = self._read_lease_record(f, i)
455 def debug_get_leases(self):
456 f = open(self.home, 'rb')
457 leases = list(self._enumerate_leases(f))
461 def add_lease(self, lease_info):
462 f = open(self.home, 'rb+')
463 num_lease_slots = self._get_num_lease_slots(f)
464 empty_slot = self._get_first_empty_lease_slot(f)
465 if empty_slot is not None:
466 self._write_lease_record(f, empty_slot, lease_info)
468 self._write_lease_record(f, num_lease_slots, lease_info)
471 def renew_lease(self, renew_secret, new_expire_time):
472 accepting_nodeids = set()
473 f = open(self.home, 'rb+')
474 for (leasenum,(oid,et,rs,cs,anid)) in self._enumerate_leases(f):
475 if rs == renew_secret:
476 # yup. See if we need to update the owner time.
477 if new_expire_time > et:
479 new_lease = (oid,new_expire_time,rs,cs,anid)
480 self._write_lease_record(f, leasenum, new_lease)
483 accepting_nodeids.add(anid)
485 # Return the accepting_nodeids set, to give the client a chance to
486 # update the leases on a share which has been migrated from its
487 # original server to a new one.
488 msg = ("Unable to renew non-existent lease. I have leases accepted by"
490 msg += ",".join([("'%s'" % idlib.b2a(anid))
491 for anid in accepting_nodeids])
493 raise IndexError(msg)
495 def add_or_renew_lease(self, lease_info):
496 ownerid, expire_time, renew_secret, cancel_secret, anid = lease_info
498 self.renew_lease(renew_secret, expire_time)
500 self.add_lease(lease_info)
502 def cancel_lease(self, cancel_secret):
503 """Remove any leases with the given cancel_secret. Return
504 (num_remaining_leases, space_freed). Raise IndexError if there was no
505 lease with the given cancel_secret."""
507 accepting_nodeids = set()
511 blank_lease = (0, 0, blank, blank, blank)
512 f = open(self.home, 'rb+')
513 for (leasenum,(oid,et,rs,cs,anid)) in self._enumerate_leases(f):
514 accepting_nodeids.add(anid)
515 if cs == cancel_secret:
516 self._write_lease_record(f, leasenum, blank_lease)
521 freed_space = self._pack_leases(f)
523 return (remaining, freed_space)
524 msg = ("Unable to cancel non-existent lease. I have leases "
525 "accepted by nodeids: ")
526 msg += ",".join([("'%s'" % idlib.b2a(anid))
527 for anid in accepting_nodeids])
529 raise IndexError(msg)
531 def _pack_leases(self, f):
532 # TODO: reclaim space from cancelled leases
535 def _read_write_enabler_and_nodeid(self, f):
537 data = f.read(self.HEADER_SIZE)
539 write_enabler_nodeid, write_enabler,
540 data_length, extra_least_offset) = \
541 struct.unpack(">32s32s32sQQ", data)
542 assert magic == self.MAGIC
543 return (write_enabler, write_enabler_nodeid)
545 def readv(self, readv):
547 f = open(self.home, 'rb')
548 for (offset, length) in readv:
549 datav.append(self._read_share_data(f, offset, length))
553 # def remote_get_length(self):
554 # f = open(self.home, 'rb')
555 # data_length = self._read_data_length(f)
559 def check_write_enabler(self, write_enabler):
560 f = open(self.home, 'rb+')
561 (real_write_enabler, write_enabler_nodeid) = \
562 self._read_write_enabler_and_nodeid(f)
564 if write_enabler != real_write_enabler:
565 # accomodate share migration by reporting the nodeid used for the
567 msg = "The write enabler was recorded by nodeid '%s'." % \
568 (idlib.b2a(write_enabler_nodeid),)
569 raise BadWriteEnablerError(msg)
571 def check_testv(self, testv):
573 f = open(self.home, 'rb+')
574 for (offset, length, operator, specimen) in testv:
575 data = self._read_share_data(f, offset, length)
576 if not self.compare(data, operator, specimen):
582 def writev(self, datav, new_length):
583 f = open(self.home, 'rb+')
584 for (offset, data) in datav:
585 self._write_share_data(f, offset, data)
586 if new_length is not None:
587 self._change_container_size(f, new_length)
588 f.seek(self.DATA_LENGTH_OFFSET)
589 f.write(struct.pack(">Q", new_length))
592 def compare(self, a, op, b):
593 assert op in ("lt", "le", "eq", "ne", "ge", "gt")
610 def check_testv(self, testv):
612 for (offset, length, operator, specimen) in testv:
614 if not self.compare(data, operator, specimen):
619 def compare(self, a, op, b):
620 assert op in ("lt", "le", "eq", "ne", "ge", "gt")
635 def create_mutable_sharefile(filename, my_nodeid, write_enabler):
636 ms = MutableShareFile(filename)
637 ms.create(my_nodeid, write_enabler)
639 return MutableShareFile(filename)
642 class StorageServer(service.MultiService, Referenceable):
643 implements(RIStorageServer)
644 name = 'storageserver'
646 def __init__(self, storedir, sizelimit=None, no_storage=False):
647 service.MultiService.__init__(self)
648 self.storedir = storedir
649 sharedir = os.path.join(storedir, "shares")
650 fileutil.make_dirs(sharedir)
651 self.sharedir = sharedir
652 self.sizelimit = sizelimit
653 self.no_storage = no_storage
654 self.incomingdir = os.path.join(sharedir, 'incoming')
655 self._clean_incomplete()
656 fileutil.make_dirs(self.incomingdir)
657 self._active_writers = weakref.WeakKeyDictionary()
660 def setNodeID(self, nodeid):
661 # somebody must set this before any slots can be created or leases
663 self.my_nodeid = nodeid
665 def startService(self):
666 service.MultiService.startService(self)
668 nodeid = self.parent.nodeid # 20 bytes, binary
669 assert len(nodeid) == 20
670 self.setNodeID(nodeid + "\x00"*12) # make it 32 bytes
671 # TODO: review this 20-vs-32 thing, settle on one or the other
673 def _clean_incomplete(self):
674 fileutil.rm_dir(self.incomingdir)
676 def measure_size(self):
677 self.consumed = fileutil.du(self.sharedir)
679 def allocated_size(self):
680 space = self.consumed
681 for bw in self._active_writers:
682 space += bw.allocated_size()
685 def remote_allocate_buckets(self, storage_index,
686 renew_secret, cancel_secret,
687 sharenums, allocated_size,
688 canary, owner_num=0):
689 # owner_num is not for clients to set, but rather it should be
690 # curried into the PersonalStorageServer instance that is dedicated
691 # to a particular owner.
693 bucketwriters = {} # k: shnum, v: BucketWriter
694 si_s = idlib.b2a(storage_index)
696 # in this implementation, the lease information (including secrets)
697 # goes into the share files themselves. It could also be put into a
698 # separate database. Note that the lease should not be added until
699 # the BucketWrite has been closed.
700 expire_time = time.time() + 31*24*60*60
701 lease_info = (owner_num, renew_secret, cancel_secret, expire_time)
703 space_per_bucket = allocated_size
704 no_limits = self.sizelimit is None
705 yes_limits = not no_limits
707 remaining_space = self.sizelimit - self.allocated_size()
709 # fill alreadygot with all shares that we have, not just the ones
710 # they asked about: this will save them a lot of work. Add or update
711 # leases for all of them: if they want us to hold shares for this
712 # file, they'll want us to hold leases for this file.
713 for (shnum, fn) in chain(self._get_bucket_shares(storage_index),
714 self._get_incoming_shares(storage_index)):
715 alreadygot.add(shnum)
717 sf.add_or_renew_lease(lease_info)
719 for shnum in sharenums:
720 incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum)
721 finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum)
722 if os.path.exists(incominghome) or os.path.exists(finalhome):
723 # great! we already have it. easy.
725 elif no_limits or remaining_space >= space_per_bucket:
726 # ok! we need to create the new share file.
727 fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
728 bw = BucketWriter(self, incominghome, finalhome,
729 space_per_bucket, lease_info)
731 bw.throw_out_all_data = True
732 bucketwriters[shnum] = bw
733 self._active_writers[bw] = 1
735 remaining_space -= space_per_bucket
737 # bummer! not enough space to accept this bucket
741 fileutil.make_dirs(os.path.join(self.sharedir, si_s))
743 return alreadygot, bucketwriters
745 def remote_renew_lease(self, storage_index, renew_secret):
746 new_expire_time = time.time() + 31*24*60*60
747 found_buckets = False
748 for shnum, filename in self._get_bucket_shares(storage_index):
750 f = open(filename, 'rb')
753 if header[:32] == MutableShareFile.MAGIC:
754 sf = MutableShareFile(filename)
755 # note: if the share has been migrated, the renew_lease()
756 # call will throw an exception, with information to help the
757 # client update the lease.
758 elif header[:4] == struct.pack(">L", 1):
759 sf = ShareFile(filename)
762 sf.renew_lease(renew_secret, new_expire_time)
763 if not found_buckets:
764 raise IndexError("no such lease to renew")
766 def remote_cancel_lease(self, storage_index, cancel_secret):
767 storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
770 total_space_freed = 0
771 found_buckets = False
772 for shnum, filename in self._get_bucket_shares(storage_index):
773 # note: if we can't find a lease on one share, we won't bother
774 # looking in the others. Unless something broke internally
775 # (perhaps we ran out of disk space while adding a lease), the
776 # leases on all shares will be identical.
778 f = open(filename, 'rb')
781 if header[:32] == MutableShareFile.MAGIC:
782 sf = MutableShareFile(filename)
783 # note: if the share has been migrated, the renew_lease()
784 # call will throw an exception, with information to help the
785 # client update the lease.
786 elif header[:4] == struct.pack(">L", 1):
787 sf = ShareFile(filename)
790 # this raises IndexError if the lease wasn't present
791 remaining_leases, space_freed = sf.cancel_lease(cancel_secret)
792 total_space_freed += space_freed
796 # now remove the sharefile. We'll almost certainly be
797 # removing the entire directory soon.
798 filelen = os.stat(filename)[stat.ST_SIZE]
800 total_space_freed += filelen
801 if not remaining_files:
802 fileutil.rm_dir(storagedir)
803 self.consumed -= total_space_freed
804 if not found_buckets:
805 raise IndexError("no such lease to cancel")
807 def bucket_writer_closed(self, bw, consumed_size):
808 self.consumed += consumed_size
809 del self._active_writers[bw]
811 def _get_bucket_shares(self, storage_index):
812 """Return a list of (shnum, pathname) tuples for files that hold
813 shares for this storage_index. In each tuple, 'shnum' will always be
814 the integer form of the last component of 'pathname'."""
815 storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
817 for f in os.listdir(storagedir):
819 filename = os.path.join(storagedir, f)
820 yield (int(f), filename)
822 # Commonly caused by there being no buckets at all.
825 def _get_incoming_shares(self, storage_index):
826 incomingdir = os.path.join(self.incomingdir, idlib.b2a(storage_index))
828 for f in os.listdir(incomingdir):
830 filename = os.path.join(incomingdir, f)
831 yield (int(f), filename)
835 def remote_get_buckets(self, storage_index):
836 bucketreaders = {} # k: sharenum, v: BucketReader
837 for shnum, filename in self._get_bucket_shares(storage_index):
838 bucketreaders[shnum] = BucketReader(filename)
841 def get_leases(self, storage_index):
842 """Provide an iterator that yields all of the leases attached to this
843 bucket. Each lease is returned as a tuple of (owner_num,
844 renew_secret, cancel_secret, expiration_time).
846 This method is not for client use.
849 # since all shares get the same lease data, we just grab the leases
850 # from the first share
852 shnum, filename = self._get_bucket_shares(storage_index).next()
853 sf = ShareFile(filename)
854 return sf.iter_leases()
855 except StopIteration:
858 def remote_slot_testv_and_readv_and_writev(self, storage_index,
860 test_and_write_vectors,
862 si_s = idlib.b2a(storage_index)
863 (write_enabler, renew_secret, cancel_secret) = secrets
864 # shares exist if there is a file for them
865 bucketdir = os.path.join(self.sharedir, si_s)
867 if os.path.isdir(bucketdir):
868 for sharenum_s in os.listdir(bucketdir):
870 sharenum = int(sharenum_s)
873 filename = os.path.join(bucketdir, sharenum_s)
874 msf = MutableShareFile(filename)
875 msf.check_write_enabler(write_enabler)
876 shares[sharenum] = msf
877 # write_enabler is good for all existing shares.
879 # Now evaluate test vectors.
881 for sharenum in test_and_write_vectors:
882 (testv, datav, new_length) = test_and_write_vectors[sharenum]
883 if sharenum in shares:
884 if not shares[sharenum].check_testv(testv):
885 testv_is_good = False
888 # compare the vectors against an empty share, in which all
889 # reads return empty strings.
890 if not EmptyShare().check_testv(testv):
891 testv_is_good = False
894 # now gather the read vectors, before we do any writes
896 for sharenum, share in shares.items():
897 read_data[sharenum] = share.readv(read_vector)
900 # now apply the write vectors
901 for sharenum in test_and_write_vectors:
902 (testv, datav, new_length) = test_and_write_vectors[sharenum]
903 if sharenum not in shares:
904 # allocate a new share
905 allocated_size = 2000 # arbitrary, really
906 share = self._allocate_slot_share(bucketdir, secrets,
910 shares[sharenum] = share
911 shares[sharenum].writev(datav, new_length)
912 # and update the leases on all shares
914 expire_time = time.time() + 31*24*60*60 # one month
915 my_nodeid = self.my_nodeid
917 lease_info = (ownerid, expire_time, renew_secret, cancel_secret,
919 for share in shares.values():
920 share.add_or_renew_lease(lease_info)
923 return (testv_is_good, read_data)
925 def _allocate_slot_share(self, bucketdir, secrets, sharenum,
926 allocated_size, owner_num=0):
927 (write_enabler, renew_secret, cancel_secret) = secrets
928 my_nodeid = self.my_nodeid
929 fileutil.make_dirs(bucketdir)
930 filename = os.path.join(bucketdir, "%d" % sharenum)
931 share = create_mutable_sharefile(filename, my_nodeid, write_enabler)
934 def remote_slot_readv(self, storage_index, shares, readv):
935 si_s = idlib.b2a(storage_index)
936 # shares exist if there is a file for them
937 bucketdir = os.path.join(self.sharedir, si_s)
938 if not os.path.isdir(bucketdir):
941 for sharenum_s in os.listdir(bucketdir):
943 sharenum = int(sharenum_s)
946 if sharenum in shares or not shares:
947 filename = os.path.join(bucketdir, sharenum_s)
948 msf = MutableShareFile(filename)
949 datavs[sharenum] = msf.readv(readv)
954 # the code before here runs on the storage server, not the client
955 # the code beyond here runs on the client, not the storage server
958 Share data is written into a single file. At the start of the file, there is
959 a series of four-byte big-endian offset values, which indicate where each
960 section starts. Each offset is measured from the beginning of the file.
962 0x00: version number (=00 00 00 01)
965 0x0c: offset of data (=00 00 00 24)
966 0x10: offset of plaintext_hash_tree
967 0x14: offset of crypttext_hash_tree
968 0x18: offset of block_hashes
969 0x1c: offset of share_hashes
970 0x20: offset of uri_extension_length + uri_extension
972 ? : start of plaintext_hash_tree
973 ? : start of crypttext_hash_tree
974 ? : start of block_hashes
975 ? : start of share_hashes
976 each share_hash is written as a two-byte (big-endian) hashnum
977 followed by the 32-byte SHA-256 hash. We only store the hashes
978 necessary to validate the share hash root
979 ? : start of uri_extension_length (four-byte big-endian value)
980 ? : start of uri_extension
983 def allocated_size(data_size, num_segments, num_share_hashes,
985 wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
987 uri_extension_starts_at = wbp._offsets['uri_extension']
988 return uri_extension_starts_at + 4 + uri_extension_size
990 class WriteBucketProxy:
991 implements(IStorageBucketWriter)
992 def __init__(self, rref, data_size, segment_size, num_segments,
993 num_share_hashes, uri_extension_size):
995 self._data_size = data_size
996 self._segment_size = segment_size
997 self._num_segments = num_segments
999 effective_segments = mathutil.next_power_of_k(num_segments,2)
1000 self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
1001 # how many share hashes are included in each share? This will be
1002 # about ln2(num_shares).
1003 self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
1004 # we commit to not sending a uri extension larger than this
1005 self._uri_extension_size = uri_extension_size
1007 offsets = self._offsets = {}
1011 offsets['plaintext_hash_tree'] = x
1012 x += self._segment_hash_size
1013 offsets['crypttext_hash_tree'] = x
1014 x += self._segment_hash_size
1015 offsets['block_hashes'] = x
1016 x += self._segment_hash_size
1017 offsets['share_hashes'] = x
1018 x += self._share_hash_size
1019 offsets['uri_extension'] = x
1021 offset_data = struct.pack(">LLLLLLLLL",
1026 offsets['plaintext_hash_tree'],
1027 offsets['crypttext_hash_tree'],
1028 offsets['block_hashes'],
1029 offsets['share_hashes'],
1030 offsets['uri_extension'],
1032 assert len(offset_data) == 0x24
1033 self._offset_data = offset_data
1036 return self._write(0, self._offset_data)
1038 def put_block(self, segmentnum, data):
1039 offset = self._offsets['data'] + segmentnum * self._segment_size
1040 assert offset + len(data) <= self._offsets['uri_extension']
1041 assert isinstance(data, str)
1042 if segmentnum < self._num_segments-1:
1043 precondition(len(data) == self._segment_size,
1044 len(data), self._segment_size)
1046 precondition(len(data) == (self._data_size -
1047 (self._segment_size *
1048 (self._num_segments - 1))),
1049 len(data), self._segment_size)
1050 return self._write(offset, data)
1052 def put_plaintext_hashes(self, hashes):
1053 offset = self._offsets['plaintext_hash_tree']
1054 assert isinstance(hashes, list)
1055 data = "".join(hashes)
1056 precondition(len(data) == self._segment_hash_size,
1057 len(data), self._segment_hash_size)
1058 precondition(offset+len(data) <= self._offsets['crypttext_hash_tree'],
1059 offset, len(data), offset+len(data),
1060 self._offsets['crypttext_hash_tree'])
1061 return self._write(offset, data)
1063 def put_crypttext_hashes(self, hashes):
1064 offset = self._offsets['crypttext_hash_tree']
1065 assert isinstance(hashes, list)
1066 data = "".join(hashes)
1067 precondition(len(data) == self._segment_hash_size,
1068 len(data), self._segment_hash_size)
1069 precondition(offset + len(data) <= self._offsets['block_hashes'],
1070 offset, len(data), offset+len(data),
1071 self._offsets['block_hashes'])
1072 return self._write(offset, data)
1074 def put_block_hashes(self, blockhashes):
1075 offset = self._offsets['block_hashes']
1076 assert isinstance(blockhashes, list)
1077 data = "".join(blockhashes)
1078 precondition(len(data) == self._segment_hash_size,
1079 len(data), self._segment_hash_size)
1080 precondition(offset + len(data) <= self._offsets['share_hashes'],
1081 offset, len(data), offset+len(data),
1082 self._offsets['share_hashes'])
1083 return self._write(offset, data)
1085 def put_share_hashes(self, sharehashes):
1086 # sharehashes is a list of (index, hash) tuples, so they get stored
1087 # as 2+32=34 bytes each
1088 offset = self._offsets['share_hashes']
1089 assert isinstance(sharehashes, list)
1090 data = "".join([struct.pack(">H", hashnum) + hashvalue
1091 for hashnum,hashvalue in sharehashes])
1092 precondition(len(data) == self._share_hash_size,
1093 len(data), self._share_hash_size)
1094 precondition(offset + len(data) <= self._offsets['uri_extension'],
1095 offset, len(data), offset+len(data),
1096 self._offsets['uri_extension'])
1097 return self._write(offset, data)
1099 def put_uri_extension(self, data):
1100 offset = self._offsets['uri_extension']
1101 assert isinstance(data, str)
1102 precondition(len(data) <= self._uri_extension_size,
1103 len(data), self._uri_extension_size)
1104 length = struct.pack(">L", len(data))
1105 return self._write(offset, length+data)
1107 def _write(self, offset, data):
1108 # TODO: for small shares, buffer the writes and do just a single call
1109 return self._rref.callRemote("write", offset, data)
1112 return self._rref.callRemote("close")
1114 class ReadBucketProxy:
1115 implements(IStorageBucketReader)
1116 def __init__(self, rref):
1118 self._started = False
1120 def startIfNecessary(self):
1122 return defer.succeed(self)
1124 d.addCallback(lambda res: self)
1128 # TODO: for small shares, read the whole bucket in start()
1129 d = self._read(0, 0x24)
1130 d.addCallback(self._parse_offsets)
1133 def _parse_offsets(self, data):
1134 precondition(len(data) == 0x24)
1136 (version, self._segment_size, self._data_size) = \
1137 struct.unpack(">LLL", data[0:0xc])
1138 _assert(version == 1)
1140 for field in ( 'data',
1141 'plaintext_hash_tree',
1142 'crypttext_hash_tree',
1147 offset = struct.unpack(">L", data[x:x+4])[0]
1149 self._offsets[field] = offset
1150 return self._offsets
1152 def get_block(self, blocknum):
1153 num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
1154 if blocknum < num_segments-1:
1155 size = self._segment_size
1157 size = self._data_size % self._segment_size
1159 size = self._segment_size
1160 offset = self._offsets['data'] + blocknum * self._segment_size
1161 return self._read(offset, size)
1163 def _str2l(self, s):
1164 """ split string (pulled from storage) into a list of blockids """
1165 return [ s[i:i+HASH_SIZE]
1166 for i in range(0, len(s), HASH_SIZE) ]
1168 def get_plaintext_hashes(self):
1169 offset = self._offsets['plaintext_hash_tree']
1170 size = self._offsets['crypttext_hash_tree'] - offset
1171 d = self._read(offset, size)
1172 d.addCallback(self._str2l)
1175 def get_crypttext_hashes(self):
1176 offset = self._offsets['crypttext_hash_tree']
1177 size = self._offsets['block_hashes'] - offset
1178 d = self._read(offset, size)
1179 d.addCallback(self._str2l)
1182 def get_block_hashes(self):
1183 offset = self._offsets['block_hashes']
1184 size = self._offsets['share_hashes'] - offset
1185 d = self._read(offset, size)
1186 d.addCallback(self._str2l)
1189 def get_share_hashes(self):
1190 offset = self._offsets['share_hashes']
1191 size = self._offsets['uri_extension'] - offset
1192 assert size % (2+HASH_SIZE) == 0
1193 d = self._read(offset, size)
1194 def _unpack_share_hashes(data):
1195 assert len(data) == size
1197 for i in range(0, size, 2+HASH_SIZE):
1198 hashnum = struct.unpack(">H", data[i:i+2])[0]
1199 hashvalue = data[i+2:i+2+HASH_SIZE]
1200 hashes.append( (hashnum, hashvalue) )
1202 d.addCallback(_unpack_share_hashes)
1205 def get_uri_extension(self):
1206 offset = self._offsets['uri_extension']
1207 d = self._read(offset, 4)
1208 def _got_length(data):
1209 length = struct.unpack(">L", data)[0]
1210 return self._read(offset+4, length)
1211 d.addCallback(_got_length)
1214 def _read(self, offset, length):
1215 return self._rref.callRemote("read", offset, length)