1 import os, re, weakref, stat, struct, time
3 from foolscap import Referenceable
4 from twisted.application import service
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
8 RIBucketReader, BadWriteEnablerError, IStatsProducer
9 from allmydata.util import base32, fileutil, idlib, log, time_format
10 from allmydata.util.assertutil import precondition
11 import allmydata # for __version__
13 class DataTooLargeError(Exception):
17 # storage/shares/incoming
18 # incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
19 # be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
20 # storage/shares/$START/$STORAGEINDEX
21 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
23 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
26 # $SHARENUM matches this regex:
27 NUM_RE=re.compile("^[0-9]+$")
29 # each share file (in storage/shares/$SI/$SHNUM) contains lease information
30 # and share data. The share data is accessed by RIBucketWriter.write and
31 # RIBucketReader.read . The lease information is not accessible through these
34 # The share file has the following layout:
35 # 0x00: share file version number, four bytes, current version is 1
36 # 0x04: share data length, four bytes big-endian = A # See Footnote 1 below.
37 # 0x08: number of leases, four bytes big-endian
38 # 0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
39 # A+0x0c = B: first lease. Lease format is:
40 # B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
41 # B+0x04: renew secret, 32 bytes (SHA256)
42 # B+0x24: cancel secret, 32 bytes (SHA256)
43 # B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
44 # B+0x48: next lease, or end of record
46 # Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers, but it is still
47 # filled in by storage servers in case the storage server software gets downgraded from >= Tahoe
48 # v1.3.0 to < Tahoe v1.3.0, or the share file is moved from one storage server to another. The
49 # value stored in this field is truncated, so If the actual share data length is >= 2**32, then
50 # the value stored in this field will be the actual share data length modulo 2**32.
52 def si_b2a(storageindex):
53 return base32.b2a(storageindex)
55 def si_a2b(ascii_storageindex):
56 return base32.a2b(ascii_storageindex)
58 def storage_index_to_dir(storageindex):
59 sia = si_b2a(storageindex)
60 return os.path.join(sia[:2], sia)
63 def __init__(self, owner_num=None, renew_secret=None, cancel_secret=None,
64 expiration_time=None, nodeid=None):
65 self.owner_num = owner_num
66 self.renew_secret = renew_secret
67 self.cancel_secret = cancel_secret
68 self.expiration_time = expiration_time
69 if nodeid is not None:
70 assert isinstance(nodeid, str)
71 assert len(nodeid) == 20
74 def from_immutable_data(self, data):
78 self.expiration_time) = struct.unpack(">L32s32sL", data)
81 def to_immutable_data(self):
82 return struct.pack(">L32s32sL",
84 self.renew_secret, self.cancel_secret,
85 int(self.expiration_time))
87 def to_mutable_data(self):
88 return struct.pack(">LL32s32s20s",
90 int(self.expiration_time),
91 self.renew_secret, self.cancel_secret,
93 def from_mutable_data(self, data):
96 self.renew_secret, self.cancel_secret,
97 self.nodeid) = struct.unpack(">LL32s32s20s", data)
102 LEASE_SIZE = struct.calcsize(">L32s32sL")
104 def __init__(self, filename, max_size=None, create=False):
105 """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
106 precondition((max_size is not None) or (not create), max_size, create)
108 self._max_size = max_size
110 # touch the file, so later callers will see that we're working on it.
111 # Also construct the metadata.
112 assert not os.path.exists(self.home)
113 fileutil.make_dirs(os.path.dirname(self.home))
114 f = open(self.home, 'wb')
115 # The second field -- the four-byte share data length -- is no
116 # longer used as of Tahoe v1.3.0, but we continue to write it in
117 # there in case someone downgrades a storage server from >=
118 # Tahoe-1.3.0 to < Tahoe-1.3.0, or moves a share file from one
119 # server to another, etc. We do saturation -- a share data length
120 # larger than 2**32-1 (what can fit into the field) is marked as
121 # the largest length that can fit into the field. That way, even
122 # if this does happen, the old < v1.3.0 server will still allow
123 # clients to read the first part of the share.
124 f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0))
126 self._lease_offset = max_size + 0x0c
129 f = open(self.home, 'rb')
130 filesize = os.path.getsize(self.home)
131 (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
133 assert version == 1, version
134 self._num_leases = num_leases
135 self._lease_offset = filesize - (num_leases * self.LEASE_SIZE)
136 self._data_offset = 0xc
141 def read_share_data(self, offset, length):
142 precondition(offset >= 0)
143 # reads beyond the end of the data are truncated. Reads that start beyond the end of the
144 # data return an empty string.
145 # I wonder why Python doesn't do the following computation for me?
146 seekpos = self._data_offset+offset
147 fsize = os.path.getsize(self.home)
148 actuallength = max(0, min(length, fsize-seekpos))
149 if actuallength == 0:
151 f = open(self.home, 'rb')
153 return f.read(actuallength)
155 def write_share_data(self, offset, data):
157 precondition(offset >= 0, offset)
158 if self._max_size is not None and offset+length > self._max_size:
159 raise DataTooLargeError(self._max_size, offset, length)
160 f = open(self.home, 'rb+')
161 real_offset = self._data_offset+offset
163 assert f.tell() == real_offset
167 def _write_lease_record(self, f, lease_number, lease_info):
168 offset = self._lease_offset + lease_number * self.LEASE_SIZE
170 assert f.tell() == offset
171 f.write(lease_info.to_immutable_data())
173 def _read_num_leases(self, f):
175 (num_leases,) = struct.unpack(">L", f.read(4))
178 def _write_num_leases(self, f, num_leases):
180 f.write(struct.pack(">L", num_leases))
182 def _truncate_leases(self, f, num_leases):
183 f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
185 def iter_leases(self):
186 """Yields (ownernum, renew_secret, cancel_secret, expiration_time)
188 f = open(self.home, 'rb')
189 (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
190 f.seek(self._lease_offset)
191 for i in range(num_leases):
192 data = f.read(self.LEASE_SIZE)
194 yield LeaseInfo().from_immutable_data(data)
196 def add_lease(self, lease_info):
197 f = open(self.home, 'rb+')
198 num_leases = self._read_num_leases(f)
199 self._write_lease_record(f, num_leases, lease_info)
200 self._write_num_leases(f, num_leases+1)
203 def renew_lease(self, renew_secret, new_expire_time):
204 for i,lease in enumerate(self.iter_leases()):
205 if lease.renew_secret == renew_secret:
206 # yup. See if we need to update the owner time.
207 if new_expire_time > lease.expiration_time:
209 lease.expiration_time = new_expire_time
210 f = open(self.home, 'rb+')
211 self._write_lease_record(f, i, lease)
214 raise IndexError("unable to renew non-existent lease")
216 def add_or_renew_lease(self, lease_info):
218 self.renew_lease(lease_info.renew_secret,
219 lease_info.expiration_time)
221 self.add_lease(lease_info)
224 def cancel_lease(self, cancel_secret):
225 """Remove a lease with the given cancel_secret. If the last lease is
226 cancelled, the file will be removed. Return the number of bytes that
227 were freed (by truncating the list of leases, and possibly by
228 deleting the file. Raise IndexError if there was no lease with the
232 leases = list(self.iter_leases())
233 num_leases = len(leases)
234 num_leases_removed = 0
235 for i,lease in enumerate(leases[:]):
236 if lease.cancel_secret == cancel_secret:
238 num_leases_removed += 1
239 if not num_leases_removed:
240 raise IndexError("unable to find matching lease to cancel")
241 if num_leases_removed:
242 # pack and write out the remaining leases. We write these out in
243 # the same order as they were added, so that if we crash while
244 # doing this, we won't lose any non-cancelled leases.
245 leases = [l for l in leases if l] # remove the cancelled leases
246 f = open(self.home, 'rb+')
247 for i,lease in enumerate(leases):
248 self._write_lease_record(f, i, lease)
249 self._write_num_leases(f, len(leases))
250 self._truncate_leases(f, len(leases))
252 space_freed = self.LEASE_SIZE * num_leases_removed
254 space_freed += os.stat(self.home)[stat.ST_SIZE]
259 class BucketWriter(Referenceable):
260 implements(RIBucketWriter)
262 def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary):
264 self.incominghome = incominghome
265 self.finalhome = finalhome
266 self._max_size = max_size # don't allow the client to write more than this
267 self._canary = canary
268 self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
270 self.throw_out_all_data = False
271 self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
272 # also, add our lease to the file now, so that other ones can be
273 # added by simultaneous uploaders
274 self._sharefile.add_lease(lease_info)
276 def allocated_size(self):
277 return self._max_size
279 def remote_write(self, offset, data):
281 precondition(not self.closed)
282 if self.throw_out_all_data:
284 self._sharefile.write_share_data(offset, data)
285 self.ss.add_latency("write", time.time() - start)
286 self.ss.count("write")
288 def remote_close(self):
289 precondition(not self.closed)
292 fileutil.make_dirs(os.path.dirname(self.finalhome))
293 fileutil.rename(self.incominghome, self.finalhome)
295 # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
296 # We try to delete the parent (.../ab/abcde) to avoid leaving
297 # these directories lying around forever, but the delete might
298 # fail if we're working on another share for the same storage
299 # index (like ab/abcde/5). The alternative approach would be to
300 # use a hierarchy of objects (PrefixHolder, BucketHolder,
301 # ShareWriter), each of which is responsible for a single
302 # directory on disk, and have them use reference counting of
303 # their children to know when they should do the rmdir. This
304 # approach is simpler, but relies on os.rmdir refusing to delete
305 # a non-empty directory. Do *not* use fileutil.rm_dir() here!
306 os.rmdir(os.path.dirname(self.incominghome))
307 # we also delete the grandparent (prefix) directory, .../ab ,
308 # again to avoid leaving directories lying around. This might
309 # fail if there is another bucket open that shares a prefix (like
311 os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
312 # we leave the great-grandparent (incoming/) directory in place.
313 except EnvironmentError:
314 # ignore the "can't rmdir because the directory is not empty"
315 # exceptions, those are normal consequences of the
316 # above-mentioned conditions.
318 self._sharefile = None
320 self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
322 filelen = os.stat(self.finalhome)[stat.ST_SIZE]
323 self.ss.bucket_writer_closed(self, filelen)
324 self.ss.add_latency("close", time.time() - start)
325 self.ss.count("close")
327 def _disconnected(self):
331 def remote_abort(self):
332 log.msg("storage: aborting sharefile %s" % self.incominghome,
333 facility="tahoe.storage", level=log.UNUSUAL)
335 self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
337 self.ss.count("abort")
342 os.remove(self.incominghome)
343 # if we were the last share to be moved, remove the incoming/
344 # directory that was our parent
345 parentdir = os.path.split(self.incominghome)[0]
346 if not os.listdir(parentdir):
351 class BucketReader(Referenceable):
352 implements(RIBucketReader)
354 def __init__(self, ss, sharefname, storage_index=None, shnum=None):
356 self._share_file = ShareFile(sharefname)
357 self.storage_index = storage_index
361 return "<%s %s %s>" % (self.__class__.__name__, base32.b2a_l(self.storage_index[:8], 60), self.shnum)
363 def remote_read(self, offset, length):
365 data = self._share_file.read_share_data(offset, length)
366 self.ss.add_latency("read", time.time() - start)
367 self.ss.count("read")
370 def remote_advise_corrupt_share(self, reason):
371 return self.ss.remote_advise_corrupt_share("immutable",
376 # the MutableShareFile is like the ShareFile, but used for mutable data. It
377 # has a different layout. See docs/mutable.txt for more details.
380 # 1 0 32 magic verstr "tahoe mutable container v1" plus binary
381 # 2 32 20 write enabler's nodeid
382 # 3 52 32 write enabler
383 # 4 84 8 data size (actual share data present) (a)
384 # 5 92 8 offset of (8) count of extra leases (after data)
385 # 6 100 368 four leases, 92 bytes each
386 # 0 4 ownerid (0 means "no lease here")
387 # 4 4 expiration timestamp
390 # 72 20 nodeid which accepted the tokens
392 # 8 ?? 4 count of extra leases
393 # 9 ?? n*92 extra leases
396 assert struct.calcsize("L"), 4 # The struct module doc says that L's are 4 bytes in size.
397 assert struct.calcsize("Q"), 8 # The struct module doc says that Q's are 8 bytes in size (at least with big-endian ordering).
399 class MutableShareFile:
401 DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
402 EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
403 HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
404 LEASE_SIZE = struct.calcsize(">LL32s32s20s")
405 assert LEASE_SIZE == 92
406 DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
407 assert DATA_OFFSET == 468, DATA_OFFSET
408 # our sharefiles share with a recognizable string, plus some random
409 # binary data to reduce the chance that a regular text file will look
411 MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
412 assert len(MAGIC) == 32
413 MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
414 # TODO: decide upon a policy for max share size
416 def __init__(self, filename, parent=None):
418 if os.path.exists(self.home):
419 # we don't cache anything, just check the magic
420 f = open(self.home, 'rb')
421 data = f.read(self.HEADER_SIZE)
423 write_enabler_nodeid, write_enabler,
424 data_length, extra_least_offset) = \
425 struct.unpack(">32s20s32sQQ", data)
426 assert magic == self.MAGIC
427 self.parent = parent # for logging
429 def log(self, *args, **kwargs):
430 return self.parent.log(*args, **kwargs)
432 def create(self, my_nodeid, write_enabler):
433 assert not os.path.exists(self.home)
435 extra_lease_offset = (self.HEADER_SIZE
436 + 4 * self.LEASE_SIZE
438 assert extra_lease_offset == self.DATA_OFFSET # true at creation
440 f = open(self.home, 'wb')
441 header = struct.pack(">32s20s32sQQ",
442 self.MAGIC, my_nodeid, write_enabler,
443 data_length, extra_lease_offset,
445 leases = ("\x00"*self.LEASE_SIZE) * 4
446 f.write(header + leases)
447 # data goes here, empty after creation
448 f.write(struct.pack(">L", num_extra_leases))
449 # extra leases go here, none at creation
455 def _read_data_length(self, f):
456 f.seek(self.DATA_LENGTH_OFFSET)
457 (data_length,) = struct.unpack(">Q", f.read(8))
460 def _write_data_length(self, f, data_length):
461 f.seek(self.DATA_LENGTH_OFFSET)
462 f.write(struct.pack(">Q", data_length))
464 def _read_share_data(self, f, offset, length):
465 precondition(offset >= 0)
466 data_length = self._read_data_length(f)
467 if offset+length > data_length:
468 # reads beyond the end of the data are truncated. Reads that
469 # start beyond the end of the data return an empty string.
470 length = max(0, data_length-offset)
473 precondition(offset+length <= data_length)
474 f.seek(self.DATA_OFFSET+offset)
475 data = f.read(length)
478 def _read_extra_lease_offset(self, f):
479 f.seek(self.EXTRA_LEASE_OFFSET)
480 (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
481 return extra_lease_offset
483 def _write_extra_lease_offset(self, f, offset):
484 f.seek(self.EXTRA_LEASE_OFFSET)
485 f.write(struct.pack(">Q", offset))
487 def _read_num_extra_leases(self, f):
488 offset = self._read_extra_lease_offset(f)
490 (num_extra_leases,) = struct.unpack(">L", f.read(4))
491 return num_extra_leases
493 def _write_num_extra_leases(self, f, num_leases):
494 extra_lease_offset = self._read_extra_lease_offset(f)
495 f.seek(extra_lease_offset)
496 f.write(struct.pack(">L", num_leases))
498 def _change_container_size(self, f, new_container_size):
499 if new_container_size > self.MAX_SIZE:
500 raise DataTooLargeError()
501 old_extra_lease_offset = self._read_extra_lease_offset(f)
502 new_extra_lease_offset = self.DATA_OFFSET + new_container_size
503 if new_extra_lease_offset < old_extra_lease_offset:
504 # TODO: allow containers to shrink. For now they remain large.
506 num_extra_leases = self._read_num_extra_leases(f)
507 f.seek(old_extra_lease_offset)
508 extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
509 f.seek(new_extra_lease_offset)
510 f.write(extra_lease_data)
511 # an interrupt here will corrupt the leases, iff the move caused the
512 # extra leases to overlap.
513 self._write_extra_lease_offset(f, new_extra_lease_offset)
515 def _write_share_data(self, f, offset, data):
517 precondition(offset >= 0)
518 data_length = self._read_data_length(f)
519 extra_lease_offset = self._read_extra_lease_offset(f)
521 if offset+length >= data_length:
522 # They are expanding their data size.
523 if self.DATA_OFFSET+offset+length > extra_lease_offset:
524 # Their new data won't fit in the current container, so we
525 # have to move the leases. With luck, they're expanding it
526 # more than the size of the extra lease block, which will
527 # minimize the corrupt-the-share window
528 self._change_container_size(f, offset+length)
529 extra_lease_offset = self._read_extra_lease_offset(f)
531 # an interrupt here is ok.. the container has been enlarged
532 # but the data remains untouched
534 assert self.DATA_OFFSET+offset+length <= extra_lease_offset
535 # Their data now fits in the current container. We must write
536 # their new data and modify the recorded data size.
537 new_data_length = offset+length
538 self._write_data_length(f, new_data_length)
539 # an interrupt here will result in a corrupted share
541 # now all that's left to do is write out their data
542 f.seek(self.DATA_OFFSET+offset)
546 def _write_lease_record(self, f, lease_number, lease_info):
547 extra_lease_offset = self._read_extra_lease_offset(f)
548 num_extra_leases = self._read_num_extra_leases(f)
550 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
551 elif (lease_number-4) < num_extra_leases:
552 offset = (extra_lease_offset
554 + (lease_number-4)*self.LEASE_SIZE)
556 # must add an extra lease record
557 self._write_num_extra_leases(f, num_extra_leases+1)
558 offset = (extra_lease_offset
560 + (lease_number-4)*self.LEASE_SIZE)
562 assert f.tell() == offset
563 f.write(lease_info.to_mutable_data())
565 def _read_lease_record(self, f, lease_number):
566 # returns a LeaseInfo instance, or None
567 extra_lease_offset = self._read_extra_lease_offset(f)
568 num_extra_leases = self._read_num_extra_leases(f)
570 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
571 elif (lease_number-4) < num_extra_leases:
572 offset = (extra_lease_offset
574 + (lease_number-4)*self.LEASE_SIZE)
576 raise IndexError("No such lease number %d" % lease_number)
578 assert f.tell() == offset
579 data = f.read(self.LEASE_SIZE)
580 lease_info = LeaseInfo().from_mutable_data(data)
581 if lease_info.owner_num == 0:
585 def _get_num_lease_slots(self, f):
586 # how many places do we have allocated for leases? Not all of them
588 num_extra_leases = self._read_num_extra_leases(f)
589 return 4+num_extra_leases
591 def _get_first_empty_lease_slot(self, f):
592 # return an int with the index of an empty slot, or None if we do not
593 # currently have an empty slot
595 for i in range(self._get_num_lease_slots(f)):
596 if self._read_lease_record(f, i) is None:
600 def _enumerate_leases(self, f):
601 """Yields (leasenum, (ownerid, expiration_time, renew_secret,
602 cancel_secret, accepting_nodeid)) for all leases."""
603 for i in range(self._get_num_lease_slots(f)):
605 data = self._read_lease_record(f, i)
611 def debug_get_leases(self):
612 f = open(self.home, 'rb')
613 leases = list(self._enumerate_leases(f))
617 def add_lease(self, lease_info):
618 precondition(lease_info.owner_num != 0) # 0 means "no lease here"
619 f = open(self.home, 'rb+')
620 num_lease_slots = self._get_num_lease_slots(f)
621 empty_slot = self._get_first_empty_lease_slot(f)
622 if empty_slot is not None:
623 self._write_lease_record(f, empty_slot, lease_info)
625 self._write_lease_record(f, num_lease_slots, lease_info)
628 def renew_lease(self, renew_secret, new_expire_time):
629 accepting_nodeids = set()
630 f = open(self.home, 'rb+')
631 for (leasenum,lease) in self._enumerate_leases(f):
632 if lease.renew_secret == renew_secret:
633 # yup. See if we need to update the owner time.
634 if new_expire_time > lease.expiration_time:
636 lease.expiration_time = new_expire_time
637 self._write_lease_record(f, leasenum, lease)
640 accepting_nodeids.add(lease.nodeid)
642 # Return the accepting_nodeids set, to give the client a chance to
643 # update the leases on a share which has been migrated from its
644 # original server to a new one.
645 msg = ("Unable to renew non-existent lease. I have leases accepted by"
647 msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
648 for anid in accepting_nodeids])
650 raise IndexError(msg)
652 def add_or_renew_lease(self, lease_info):
653 precondition(lease_info.owner_num != 0) # 0 means "no lease here"
655 self.renew_lease(lease_info.renew_secret,
656 lease_info.expiration_time)
658 self.add_lease(lease_info)
660 def cancel_lease(self, cancel_secret):
661 """Remove any leases with the given cancel_secret. If the last lease
662 is cancelled, the file will be removed. Return the number of bytes
663 that were freed (by truncating the list of leases, and possibly by
664 deleting the file. Raise IndexError if there was no lease with the
665 given cancel_secret."""
667 accepting_nodeids = set()
670 blank_lease = LeaseInfo(owner_num=0,
671 renew_secret="\x00"*32,
672 cancel_secret="\x00"*32,
675 f = open(self.home, 'rb+')
676 for (leasenum,lease) in self._enumerate_leases(f):
677 accepting_nodeids.add(lease.nodeid)
678 if lease.cancel_secret == cancel_secret:
679 self._write_lease_record(f, leasenum, blank_lease)
684 freed_space = self._pack_leases(f)
687 freed_space += os.stat(self.home)[stat.ST_SIZE]
691 msg = ("Unable to cancel non-existent lease. I have leases "
692 "accepted by nodeids: ")
693 msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
694 for anid in accepting_nodeids])
696 raise IndexError(msg)
698 def _pack_leases(self, f):
699 # TODO: reclaim space from cancelled leases
702 def _read_write_enabler_and_nodeid(self, f):
704 data = f.read(self.HEADER_SIZE)
706 write_enabler_nodeid, write_enabler,
707 data_length, extra_least_offset) = \
708 struct.unpack(">32s20s32sQQ", data)
709 assert magic == self.MAGIC
710 return (write_enabler, write_enabler_nodeid)
712 def readv(self, readv):
714 f = open(self.home, 'rb')
715 for (offset, length) in readv:
716 datav.append(self._read_share_data(f, offset, length))
720 # def remote_get_length(self):
721 # f = open(self.home, 'rb')
722 # data_length = self._read_data_length(f)
726 def check_write_enabler(self, write_enabler, si_s):
727 f = open(self.home, 'rb+')
728 (real_write_enabler, write_enabler_nodeid) = \
729 self._read_write_enabler_and_nodeid(f)
731 if write_enabler != real_write_enabler:
732 # accomodate share migration by reporting the nodeid used for the
734 self.log(format="bad write enabler on SI %(si)s,"
735 " recorded by nodeid %(nodeid)s",
736 facility="tahoe.storage",
737 level=log.WEIRD, umid="cE1eBQ",
738 si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
739 msg = "The write enabler was recorded by nodeid '%s'." % \
740 (idlib.nodeid_b2a(write_enabler_nodeid),)
741 raise BadWriteEnablerError(msg)
743 def check_testv(self, testv):
745 f = open(self.home, 'rb+')
746 for (offset, length, operator, specimen) in testv:
747 data = self._read_share_data(f, offset, length)
748 if not testv_compare(data, operator, specimen):
754 def writev(self, datav, new_length):
755 f = open(self.home, 'rb+')
756 for (offset, data) in datav:
757 self._write_share_data(f, offset, data)
758 if new_length is not None:
759 self._change_container_size(f, new_length)
760 f.seek(self.DATA_LENGTH_OFFSET)
761 f.write(struct.pack(">Q", new_length))
764 def testv_compare(a, op, b):
765 assert op in ("lt", "le", "eq", "ne", "ge", "gt")
782 def check_testv(self, testv):
784 for (offset, length, operator, specimen) in testv:
786 if not testv_compare(data, operator, specimen):
791 def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
792 ms = MutableShareFile(filename, parent)
793 ms.create(my_nodeid, write_enabler)
795 return MutableShareFile(filename, parent)
798 class StorageServer(service.MultiService, Referenceable):
799 implements(RIStorageServer, IStatsProducer)
802 def __init__(self, storedir, reserved_space=0,
803 discard_storage=False, readonly_storage=False,
804 stats_provider=None):
805 service.MultiService.__init__(self)
806 self.storedir = storedir
807 sharedir = os.path.join(storedir, "shares")
808 fileutil.make_dirs(sharedir)
809 self.sharedir = sharedir
810 # we don't actually create the corruption-advisory dir until necessary
811 self.corruption_advisory_dir = os.path.join(storedir,
812 "corruption-advisories")
813 self.reserved_space = int(reserved_space)
814 self.no_storage = discard_storage
815 self.readonly_storage = readonly_storage
816 self.stats_provider = stats_provider
817 if self.stats_provider:
818 self.stats_provider.register_producer(self)
819 self.incomingdir = os.path.join(sharedir, 'incoming')
820 self._clean_incomplete()
821 fileutil.make_dirs(self.incomingdir)
822 self._active_writers = weakref.WeakKeyDictionary()
823 lp = log.msg("StorageServer created", facility="tahoe.storage")
826 if self.get_available_space() is None:
827 log.msg("warning: [storage]reserved_space= is set, but this platform does not support statvfs(2), so this reservation cannot be honored",
828 umin="0wZ27w", level=log.UNUSUAL)
830 self.latencies = {"allocate": [], # immutable
835 "writev": [], # mutable
837 "add-lease": [], # both
842 def count(self, name, delta=1):
843 if self.stats_provider:
844 self.stats_provider.count("storage_server." + name, delta)
846 def add_latency(self, category, latency):
847 a = self.latencies[category]
850 self.latencies[category] = a[-1000:]
852 def get_latencies(self):
853 """Return a dict, indexed by category, that contains a dict of
854 latency numbers for each category. Each dict will contain the
855 following keys: mean, 01_0_percentile, 10_0_percentile,
856 50_0_percentile (median), 90_0_percentile, 95_0_percentile,
857 99_0_percentile, 99_9_percentile. If no samples have been collected
858 for the given category, then that category name will not be present
859 in the return value."""
860 # note that Amazon's Dynamo paper says they use 99.9% percentile.
862 for category in self.latencies:
863 if not self.latencies[category]:
866 samples = self.latencies[category][:]
869 stats["mean"] = sum(samples) / count
870 stats["01_0_percentile"] = samples[int(0.01 * count)]
871 stats["10_0_percentile"] = samples[int(0.1 * count)]
872 stats["50_0_percentile"] = samples[int(0.5 * count)]
873 stats["90_0_percentile"] = samples[int(0.9 * count)]
874 stats["95_0_percentile"] = samples[int(0.95 * count)]
875 stats["99_0_percentile"] = samples[int(0.99 * count)]
876 stats["99_9_percentile"] = samples[int(0.999 * count)]
877 output[category] = stats
880 def log(self, *args, **kwargs):
881 if "facility" not in kwargs:
882 kwargs["facility"] = "tahoe.storage"
883 return log.msg(*args, **kwargs)
885 def setNodeID(self, nodeid):
886 # somebody must set this before any slots can be created or leases
888 self.my_nodeid = nodeid
890 def startService(self):
891 service.MultiService.startService(self)
893 nodeid = self.parent.nodeid # 20 bytes, binary
894 assert len(nodeid) == 20
895 self.setNodeID(nodeid)
897 def _clean_incomplete(self):
898 fileutil.rm_dir(self.incomingdir)
901 # remember: RIStatsProvider requires that our return dict
902 # contains numeric values.
903 stats = { 'storage_server.allocated': self.allocated_size(), }
904 for category,ld in self.get_latencies().items():
905 for name,v in ld.items():
906 stats['storage_server.latencies.%s.%s' % (category, name)] = v
908 if self.readonly_storage:
911 s = os.statvfs(self.storedir)
912 disk_total = s.f_bsize * s.f_blocks
913 disk_used = s.f_bsize * (s.f_blocks - s.f_bfree)
914 # spacetime predictors should look at the slope of disk_used.
915 disk_avail = s.f_bsize * s.f_bavail # available to non-root users
916 # include our local policy here: if we stop accepting shares when
917 # the available space drops below 1GB, then include that fact in
919 disk_avail -= self.reserved_space
920 disk_avail = max(disk_avail, 0)
921 if self.readonly_storage:
926 # spacetime predictors should use disk_avail / (d(disk_used)/dt)
927 stats["storage_server.disk_total"] = disk_total
928 stats["storage_server.disk_used"] = disk_used
929 stats["storage_server.disk_avail"] = disk_avail
930 except AttributeError:
931 # os.statvfs is available only on unix
933 stats["storage_server.accepting_immutable_shares"] = int(writeable)
937 def stat_disk(self, d):
939 # s.f_bavail: available to non-root users
940 disk_avail = s.f_bsize * s.f_bavail
943 def get_available_space(self):
944 # returns None if it cannot be measured (windows)
946 disk_avail = self.stat_disk(self.storedir)
947 disk_avail -= self.reserved_space
948 except AttributeError:
950 if self.readonly_storage:
954 def allocated_size(self):
956 for bw in self._active_writers:
957 space += bw.allocated_size()
960 def remote_get_version(self):
961 remaining_space = self.get_available_space()
962 if remaining_space is None:
963 # we're on a platform that doesn't have 'df', so make a vague
965 remaining_space = 2**64
966 version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
967 { "maximum-immutable-share-size": remaining_space,
968 "tolerates-immutable-read-overrun": True,
969 "delete-mutable-shares-with-zero-length-writev": True,
971 "application-version": str(allmydata.__version__),
975 def remote_allocate_buckets(self, storage_index,
976 renew_secret, cancel_secret,
977 sharenums, allocated_size,
978 canary, owner_num=0):
979 # owner_num is not for clients to set, but rather it should be
980 # curried into the PersonalStorageServer instance that is dedicated
981 # to a particular owner.
983 self.count("allocate")
985 bucketwriters = {} # k: shnum, v: BucketWriter
986 si_dir = storage_index_to_dir(storage_index)
987 si_s = si_b2a(storage_index)
989 log.msg("storage: allocate_buckets %s" % si_s)
991 # in this implementation, the lease information (including secrets)
992 # goes into the share files themselves. It could also be put into a
993 # separate database. Note that the lease should not be added until
994 # the BucketWriter has been closed.
995 expire_time = time.time() + 31*24*60*60
996 lease_info = LeaseInfo(owner_num,
997 renew_secret, cancel_secret,
998 expire_time, self.my_nodeid)
1000 max_space_per_bucket = allocated_size
1002 remaining_space = self.get_available_space()
1003 limited = remaining_space is not None
1005 # this is a bit conservative, since some of this allocated_size()
1006 # has already been written to disk, where it will show up in
1007 # get_available_space.
1008 remaining_space -= self.allocated_size()
1010 # fill alreadygot with all shares that we have, not just the ones
1011 # they asked about: this will save them a lot of work. Add or update
1012 # leases for all of them: if they want us to hold shares for this
1013 # file, they'll want us to hold leases for this file.
1014 for (shnum, fn) in self._get_bucket_shares(storage_index):
1015 alreadygot.add(shnum)
1017 sf.add_or_renew_lease(lease_info)
1019 # self.readonly_storage causes remaining_space=0
1021 for shnum in sharenums:
1022 incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
1023 finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
1024 if os.path.exists(finalhome):
1025 # great! we already have it. easy.
1027 elif os.path.exists(incominghome):
1028 # Note that we don't create BucketWriters for shnums that
1029 # have a partial share (in incoming/), so if a second upload
1030 # occurs while the first is still in progress, the second
1031 # uploader will use different storage servers.
1033 elif (not limited) or (remaining_space >= max_space_per_bucket):
1034 # ok! we need to create the new share file.
1035 bw = BucketWriter(self, incominghome, finalhome,
1036 max_space_per_bucket, lease_info, canary)
1038 bw.throw_out_all_data = True
1039 bucketwriters[shnum] = bw
1040 self._active_writers[bw] = 1
1042 remaining_space -= max_space_per_bucket
1044 # bummer! not enough space to accept this bucket
1048 fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
1050 self.add_latency("allocate", time.time() - start)
1051 return alreadygot, bucketwriters
1053 def _iter_share_files(self, storage_index):
1054 for shnum, filename in self._get_bucket_shares(storage_index):
1055 f = open(filename, 'rb')
1058 if header[:32] == MutableShareFile.MAGIC:
1059 sf = MutableShareFile(filename, self)
1060 # note: if the share has been migrated, the renew_lease()
1061 # call will throw an exception, with information to help the
1062 # client update the lease.
1063 elif header[:4] == struct.pack(">L", 1):
1064 sf = ShareFile(filename)
1066 continue # non-sharefile
1069 def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
1072 self.count("add-lease")
1073 new_expire_time = time.time() + 31*24*60*60
1074 lease_info = LeaseInfo(owner_num,
1075 renew_secret, cancel_secret,
1076 new_expire_time, self.my_nodeid)
1077 found_buckets = False
1078 for sf in self._iter_share_files(storage_index):
1079 found_buckets = True
1080 # note: if the share has been migrated, the renew_lease()
1081 # call will throw an exception, with information to help the
1082 # client update the lease.
1083 sf.add_or_renew_lease(lease_info)
1084 self.add_latency("add-lease", time.time() - start)
1085 if not found_buckets:
1086 raise IndexError("no such storage index to do add-lease")
1089 def remote_renew_lease(self, storage_index, renew_secret):
1092 new_expire_time = time.time() + 31*24*60*60
1093 found_buckets = False
1094 for sf in self._iter_share_files(storage_index):
1095 found_buckets = True
1096 sf.renew_lease(renew_secret, new_expire_time)
1097 self.add_latency("renew", time.time() - start)
1098 if not found_buckets:
1099 raise IndexError("no such lease to renew")
1101 def remote_cancel_lease(self, storage_index, cancel_secret):
1103 self.count("cancel")
1105 total_space_freed = 0
1106 found_buckets = False
1107 for sf in self._iter_share_files(storage_index):
1108 # note: if we can't find a lease on one share, we won't bother
1109 # looking in the others. Unless something broke internally
1110 # (perhaps we ran out of disk space while adding a lease), the
1111 # leases on all shares will be identical.
1112 found_buckets = True
1113 # this raises IndexError if the lease wasn't present XXXX
1114 total_space_freed += sf.cancel_lease(cancel_secret)
1117 storagedir = os.path.join(self.sharedir,
1118 storage_index_to_dir(storage_index))
1119 if not os.listdir(storagedir):
1120 os.rmdir(storagedir)
1122 if self.stats_provider:
1123 self.stats_provider.count('storage_server.bytes_freed',
1125 self.add_latency("cancel", time.time() - start)
1126 if not found_buckets:
1127 raise IndexError("no such storage index")
1129 def bucket_writer_closed(self, bw, consumed_size):
1130 if self.stats_provider:
1131 self.stats_provider.count('storage_server.bytes_added', consumed_size)
1132 del self._active_writers[bw]
1134 def _get_bucket_shares(self, storage_index):
1135 """Return a list of (shnum, pathname) tuples for files that hold
1136 shares for this storage_index. In each tuple, 'shnum' will always be
1137 the integer form of the last component of 'pathname'."""
1138 storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
1140 for f in os.listdir(storagedir):
1142 filename = os.path.join(storagedir, f)
1143 yield (int(f), filename)
1145 # Commonly caused by there being no buckets at all.
1148 def remote_get_buckets(self, storage_index):
1151 si_s = si_b2a(storage_index)
1152 log.msg("storage: get_buckets %s" % si_s)
1153 bucketreaders = {} # k: sharenum, v: BucketReader
1154 for shnum, filename in self._get_bucket_shares(storage_index):
1155 bucketreaders[shnum] = BucketReader(self, filename,
1156 storage_index, shnum)
1157 self.add_latency("get", time.time() - start)
1158 return bucketreaders
1160 def get_leases(self, storage_index):
1161 """Provide an iterator that yields all of the leases attached to this
1162 bucket. Each lease is returned as a tuple of (owner_num,
1163 renew_secret, cancel_secret, expiration_time).
1165 This method is not for client use.
1168 # since all shares get the same lease data, we just grab the leases
1169 # from the first share
1171 shnum, filename = self._get_bucket_shares(storage_index).next()
1172 sf = ShareFile(filename)
1173 return sf.iter_leases()
1174 except StopIteration:
1177 def remote_slot_testv_and_readv_and_writev(self, storage_index,
1179 test_and_write_vectors,
1182 self.count("writev")
1183 si_s = si_b2a(storage_index)
1184 lp = log.msg("storage: slot_writev %s" % si_s)
1185 si_dir = storage_index_to_dir(storage_index)
1186 (write_enabler, renew_secret, cancel_secret) = secrets
1187 # shares exist if there is a file for them
1188 bucketdir = os.path.join(self.sharedir, si_dir)
1190 if os.path.isdir(bucketdir):
1191 for sharenum_s in os.listdir(bucketdir):
1193 sharenum = int(sharenum_s)
1196 filename = os.path.join(bucketdir, sharenum_s)
1197 msf = MutableShareFile(filename, self)
1198 msf.check_write_enabler(write_enabler, si_s)
1199 shares[sharenum] = msf
1200 # write_enabler is good for all existing shares.
1202 # Now evaluate test vectors.
1203 testv_is_good = True
1204 for sharenum in test_and_write_vectors:
1205 (testv, datav, new_length) = test_and_write_vectors[sharenum]
1206 if sharenum in shares:
1207 if not shares[sharenum].check_testv(testv):
1208 self.log("testv failed: [%d]: %r" % (sharenum, testv))
1209 testv_is_good = False
1212 # compare the vectors against an empty share, in which all
1213 # reads return empty strings.
1214 if not EmptyShare().check_testv(testv):
1215 self.log("testv failed (empty): [%d] %r" % (sharenum,
1217 testv_is_good = False
1220 # now gather the read vectors, before we do any writes
1222 for sharenum, share in shares.items():
1223 read_data[sharenum] = share.readv(read_vector)
1226 expire_time = time.time() + 31*24*60*60 # one month
1227 lease_info = LeaseInfo(ownerid,
1228 renew_secret, cancel_secret,
1229 expire_time, self.my_nodeid)
1232 # now apply the write vectors
1233 for sharenum in test_and_write_vectors:
1234 (testv, datav, new_length) = test_and_write_vectors[sharenum]
1236 if sharenum in shares:
1237 shares[sharenum].unlink()
1239 if sharenum not in shares:
1240 # allocate a new share
1241 allocated_size = 2000 # arbitrary, really
1242 share = self._allocate_slot_share(bucketdir, secrets,
1246 shares[sharenum] = share
1247 shares[sharenum].writev(datav, new_length)
1248 # and update the lease
1249 shares[sharenum].add_or_renew_lease(lease_info)
1252 # delete empty bucket directories
1253 if not os.listdir(bucketdir):
1258 self.add_latency("writev", time.time() - start)
1259 return (testv_is_good, read_data)
1261 def _allocate_slot_share(self, bucketdir, secrets, sharenum,
1262 allocated_size, owner_num=0):
1263 (write_enabler, renew_secret, cancel_secret) = secrets
1264 my_nodeid = self.my_nodeid
1265 fileutil.make_dirs(bucketdir)
1266 filename = os.path.join(bucketdir, "%d" % sharenum)
1267 share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
1271 def remote_slot_readv(self, storage_index, shares, readv):
1274 si_s = si_b2a(storage_index)
1275 lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
1276 facility="tahoe.storage", level=log.OPERATIONAL)
1277 si_dir = storage_index_to_dir(storage_index)
1278 # shares exist if there is a file for them
1279 bucketdir = os.path.join(self.sharedir, si_dir)
1280 if not os.path.isdir(bucketdir):
1281 self.add_latency("readv", time.time() - start)
1284 for sharenum_s in os.listdir(bucketdir):
1286 sharenum = int(sharenum_s)
1289 if sharenum in shares or not shares:
1290 filename = os.path.join(bucketdir, sharenum_s)
1291 msf = MutableShareFile(filename, self)
1292 datavs[sharenum] = msf.readv(readv)
1293 log.msg("returning shares %s" % (datavs.keys(),),
1294 facility="tahoe.storage", level=log.NOISY, parent=lp)
1295 self.add_latency("readv", time.time() - start)
1298 def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
1300 fileutil.make_dirs(self.corruption_advisory_dir)
1301 now = time_format.iso_utc(sep="T")
1302 si_s = base32.b2a(storage_index)
1303 # windows can't handle colons in the filename
1304 fn = os.path.join(self.corruption_advisory_dir,
1305 "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
1307 f.write("report: Share Corruption\n")
1308 f.write("type: %s\n" % share_type)
1309 f.write("storage_index: %s\n" % si_s)
1310 f.write("share_number: %d\n" % shnum)
1315 log.msg(format=("client claims corruption in (%(share_type)s) " +
1316 "%(si)s-%(shnum)d: %(reason)s"),
1317 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
1318 level=log.SCARY, umid="SGx2fA")