1 import os, re, weakref, struct, time
3 from foolscap.api import Referenceable
4 from twisted.application import service
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, log, time_format
9 import allmydata # for __full_version__
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15 create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18 from allmydata.storage.expirer import LeaseCheckingCrawler
21 # storage/shares/incoming
22 # incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23 # be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24 # storage/shares/$START/$STORAGEINDEX
25 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
27 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
30 # $SHARENUM matches this regex:
31 NUM_RE=re.compile("^[0-9]+$")
35 class StorageServer(service.MultiService, Referenceable):
36 implements(RIStorageServer, IStatsProducer)
38 LeaseCheckerClass = LeaseCheckingCrawler
42 import win32api, win32con
44 # <http://msdn.microsoft.com/en-us/library/ms680621%28VS.85%29.aspx>
45 win32api.SetErrorMode(win32con.SEM_FAILCRITICALERRORS |
46 win32con.SEM_NOOPENFILEERRORBOX)
50 def __init__(self, storedir, nodeid, reserved_space=0,
51 discard_storage=False, readonly_storage=False,
53 expiration_enabled=False,
54 expiration_mode="age",
55 expiration_override_lease_duration=None,
56 expiration_cutoff_date=None,
57 expiration_sharetypes=("mutable", "immutable")):
58 service.MultiService.__init__(self)
59 assert isinstance(nodeid, str)
60 assert len(nodeid) == 20
61 self.my_nodeid = nodeid
62 self.storedir = storedir
63 sharedir = os.path.join(storedir, "shares")
64 fileutil.make_dirs(sharedir)
65 self.sharedir = sharedir
66 # we don't actually create the corruption-advisory dir until necessary
67 self.corruption_advisory_dir = os.path.join(storedir,
68 "corruption-advisories")
69 self.reserved_space = int(reserved_space)
70 self.no_storage = discard_storage
71 self.readonly_storage = readonly_storage
72 self.stats_provider = stats_provider
73 if self.stats_provider:
74 self.stats_provider.register_producer(self)
75 self.incomingdir = os.path.join(sharedir, 'incoming')
76 self._clean_incomplete()
77 fileutil.make_dirs(self.incomingdir)
78 self._active_writers = weakref.WeakKeyDictionary()
79 log.msg("StorageServer created", facility="tahoe.storage")
82 if self.get_available_space() is None:
83 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
84 umin="0wZ27w", level=log.UNUSUAL)
86 self.latencies = {"allocate": [], # immutable
91 "writev": [], # mutable
93 "add-lease": [], # both
97 self.add_bucket_counter()
99 statefile = os.path.join(self.storedir, "lease_checker.state")
100 historyfile = os.path.join(self.storedir, "lease_checker.history")
101 klass = self.LeaseCheckerClass
102 self.lease_checker = klass(self, statefile, historyfile,
103 expiration_enabled, expiration_mode,
104 expiration_override_lease_duration,
105 expiration_cutoff_date,
106 expiration_sharetypes)
107 self.lease_checker.setServiceParent(self)
109 def add_bucket_counter(self):
110 statefile = os.path.join(self.storedir, "bucket_counter.state")
111 self.bucket_counter = BucketCountingCrawler(self, statefile)
112 self.bucket_counter.setServiceParent(self)
114 def count(self, name, delta=1):
115 if self.stats_provider:
116 self.stats_provider.count("storage_server." + name, delta)
118 def add_latency(self, category, latency):
119 a = self.latencies[category]
122 self.latencies[category] = a[-1000:]
124 def get_latencies(self):
125 """Return a dict, indexed by category, that contains a dict of
126 latency numbers for each category. Each dict will contain the
127 following keys: mean, 01_0_percentile, 10_0_percentile,
128 50_0_percentile (median), 90_0_percentile, 95_0_percentile,
129 99_0_percentile, 99_9_percentile. If no samples have been collected
130 for the given category, then that category name will not be present
131 in the return value."""
132 # note that Amazon's Dynamo paper says they use 99.9% percentile.
134 for category in self.latencies:
135 if not self.latencies[category]:
138 samples = self.latencies[category][:]
141 stats["mean"] = sum(samples) / count
142 stats["01_0_percentile"] = samples[int(0.01 * count)]
143 stats["10_0_percentile"] = samples[int(0.1 * count)]
144 stats["50_0_percentile"] = samples[int(0.5 * count)]
145 stats["90_0_percentile"] = samples[int(0.9 * count)]
146 stats["95_0_percentile"] = samples[int(0.95 * count)]
147 stats["99_0_percentile"] = samples[int(0.99 * count)]
148 stats["99_9_percentile"] = samples[int(0.999 * count)]
149 output[category] = stats
152 def log(self, *args, **kwargs):
153 if "facility" not in kwargs:
154 kwargs["facility"] = "tahoe.storage"
155 return log.msg(*args, **kwargs)
157 def _clean_incomplete(self):
158 fileutil.rm_dir(self.incomingdir)
160 def get_disk_stats(self):
161 """Return disk statistics for the storage disk, in the form of a dict
162 with the following fields.
163 total: total bytes on disk
164 free_for_root: bytes actually free on disk
165 free_for_nonroot: bytes free for "a non-privileged user" [Unix] or
166 the current user [Windows]; might take into
167 account quotas depending on platform
168 used: bytes used on disk
169 avail: bytes available excluding reserved space
170 An AttributeError can occur if the OS has no API to get disk information.
171 An EnvironmentError can occur if the OS call fails."""
174 # For Windows systems, where os.statvfs is not available, use GetDiskFreeSpaceEx.
175 # <http://docs.activestate.com/activepython/2.5/pywin32/win32api__GetDiskFreeSpaceEx_meth.html>
177 # Although the docs say that the argument should be the root directory
178 # of a disk, GetDiskFreeSpaceEx actually accepts any path on that disk
179 # (like its Win32 equivalent).
181 (free_for_nonroot, total, free_for_root) = self.win32api.GetDiskFreeSpaceEx(self.storedir)
183 # For Unix-like systems.
184 # <http://docs.python.org/library/os.html#os.statvfs>
185 # <http://opengroup.org/onlinepubs/7990989799/xsh/fstatvfs.html>
186 # <http://opengroup.org/onlinepubs/7990989799/xsh/sysstatvfs.h.html>
187 s = os.statvfs(self.storedir)
190 # statvfs(2) is a wrapper around statfs(2).
191 # statvfs.f_frsize = statfs.f_bsize :
192 # "minimum unit of allocation" (statvfs)
193 # "fundamental file system block size" (statfs)
194 # statvfs.f_bsize = statfs.f_iosize = stat.st_blocks : preferred IO size
195 # on an encrypted home directory ("FileVault"), it gets f_blocks
196 # wrong, and s.f_blocks*s.f_frsize is twice the size of my disk,
197 # but s.f_bavail*s.f_frsize is correct
199 total = s.f_frsize * s.f_blocks
200 free_for_root = s.f_frsize * s.f_bfree
201 free_for_nonroot = s.f_frsize * s.f_bavail
203 # valid for all platforms:
204 used = total - free_for_root
205 avail = max(free_for_nonroot - self.reserved_space, 0)
207 return { 'total': total, 'free_for_root': free_for_root,
208 'free_for_nonroot': free_for_nonroot,
209 'used': used, 'avail': avail, }
212 # remember: RIStatsProvider requires that our return dict
213 # contains numeric values.
214 stats = { 'storage_server.allocated': self.allocated_size(), }
215 stats['storage_server.reserved_space'] = self.reserved_space
216 for category,ld in self.get_latencies().items():
217 for name,v in ld.items():
218 stats['storage_server.latencies.%s.%s' % (category, name)] = v
221 disk = self.get_disk_stats()
222 writeable = disk['avail'] > 0
224 # spacetime predictors should use disk_avail / (d(disk_used)/dt)
225 stats['storage_server.disk_total'] = disk['total']
226 stats['storage_server.disk_used'] = disk['used']
227 stats['storage_server.disk_free_for_root'] = disk['free_for_root']
228 stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
229 stats['storage_server.disk_avail'] = disk['avail']
230 except AttributeError:
232 except EnvironmentError:
233 log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
236 if self.readonly_storage:
237 stats['storage_server.disk_avail'] = 0
240 stats['storage_server.accepting_immutable_shares'] = int(writeable)
241 s = self.bucket_counter.get_state()
242 bucket_count = s.get("last-complete-bucket-count")
244 stats['storage_server.total_bucket_count'] = bucket_count
247 def get_available_space(self):
248 """Returns available space for share storage in bytes, or None if no
249 API to get this information is available."""
251 if self.readonly_storage:
254 return self.get_disk_stats()['avail']
255 except AttributeError:
257 except EnvironmentError:
258 log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
261 def allocated_size(self):
263 for bw in self._active_writers:
264 space += bw.allocated_size()
267 def remote_get_version(self):
268 remaining_space = self.get_available_space()
269 if remaining_space is None:
270 # We're on a platform that has no API to get disk stats.
271 remaining_space = 2**64
273 version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
274 { "maximum-immutable-share-size": remaining_space,
275 "tolerates-immutable-read-overrun": True,
276 "delete-mutable-shares-with-zero-length-writev": True,
278 "application-version": str(allmydata.__full_version__),
282 def remote_allocate_buckets(self, storage_index,
283 renew_secret, cancel_secret,
284 sharenums, allocated_size,
285 canary, owner_num=0):
286 # owner_num is not for clients to set, but rather it should be
287 # curried into the PersonalStorageServer instance that is dedicated
288 # to a particular owner.
290 self.count("allocate")
292 bucketwriters = {} # k: shnum, v: BucketWriter
293 si_dir = storage_index_to_dir(storage_index)
294 si_s = si_b2a(storage_index)
296 log.msg("storage: allocate_buckets %s" % si_s)
298 # in this implementation, the lease information (including secrets)
299 # goes into the share files themselves. It could also be put into a
300 # separate database. Note that the lease should not be added until
301 # the BucketWriter has been closed.
302 expire_time = time.time() + 31*24*60*60
303 lease_info = LeaseInfo(owner_num,
304 renew_secret, cancel_secret,
305 expire_time, self.my_nodeid)
307 max_space_per_bucket = allocated_size
309 remaining_space = self.get_available_space()
310 limited = remaining_space is not None
312 # this is a bit conservative, since some of this allocated_size()
313 # has already been written to disk, where it will show up in
314 # get_available_space.
315 remaining_space -= self.allocated_size()
316 # self.readonly_storage causes remaining_space <= 0
318 # fill alreadygot with all shares that we have, not just the ones
319 # they asked about: this will save them a lot of work. Add or update
320 # leases for all of them: if they want us to hold shares for this
321 # file, they'll want us to hold leases for this file.
322 for (shnum, fn) in self._get_bucket_shares(storage_index):
323 alreadygot.add(shnum)
325 sf.add_or_renew_lease(lease_info)
327 for shnum in sharenums:
328 incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
329 finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
330 if os.path.exists(finalhome):
331 # great! we already have it. easy.
333 elif os.path.exists(incominghome):
334 # Note that we don't create BucketWriters for shnums that
335 # have a partial share (in incoming/), so if a second upload
336 # occurs while the first is still in progress, the second
337 # uploader will use different storage servers.
339 elif (not limited) or (remaining_space >= max_space_per_bucket):
340 # ok! we need to create the new share file.
341 bw = BucketWriter(self, incominghome, finalhome,
342 max_space_per_bucket, lease_info, canary)
344 bw.throw_out_all_data = True
345 bucketwriters[shnum] = bw
346 self._active_writers[bw] = 1
348 remaining_space -= max_space_per_bucket
350 # bummer! not enough space to accept this bucket
354 fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
356 self.add_latency("allocate", time.time() - start)
357 return alreadygot, bucketwriters
359 def _iter_share_files(self, storage_index):
360 for shnum, filename in self._get_bucket_shares(storage_index):
361 f = open(filename, 'rb')
364 if header[:32] == MutableShareFile.MAGIC:
365 sf = MutableShareFile(filename, self)
366 # note: if the share has been migrated, the renew_lease()
367 # call will throw an exception, with information to help the
368 # client update the lease.
369 elif header[:4] == struct.pack(">L", 1):
370 sf = ShareFile(filename)
372 continue # non-sharefile
375 def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
378 self.count("add-lease")
379 new_expire_time = time.time() + 31*24*60*60
380 lease_info = LeaseInfo(owner_num,
381 renew_secret, cancel_secret,
382 new_expire_time, self.my_nodeid)
383 for sf in self._iter_share_files(storage_index):
384 sf.add_or_renew_lease(lease_info)
385 self.add_latency("add-lease", time.time() - start)
388 def remote_renew_lease(self, storage_index, renew_secret):
391 new_expire_time = time.time() + 31*24*60*60
392 found_buckets = False
393 for sf in self._iter_share_files(storage_index):
395 sf.renew_lease(renew_secret, new_expire_time)
396 self.add_latency("renew", time.time() - start)
397 if not found_buckets:
398 raise IndexError("no such lease to renew")
400 def remote_cancel_lease(self, storage_index, cancel_secret):
404 total_space_freed = 0
405 found_buckets = False
406 for sf in self._iter_share_files(storage_index):
407 # note: if we can't find a lease on one share, we won't bother
408 # looking in the others. Unless something broke internally
409 # (perhaps we ran out of disk space while adding a lease), the
410 # leases on all shares will be identical.
412 # this raises IndexError if the lease wasn't present XXXX
413 total_space_freed += sf.cancel_lease(cancel_secret)
416 storagedir = os.path.join(self.sharedir,
417 storage_index_to_dir(storage_index))
418 if not os.listdir(storagedir):
421 if self.stats_provider:
422 self.stats_provider.count('storage_server.bytes_freed',
424 self.add_latency("cancel", time.time() - start)
425 if not found_buckets:
426 raise IndexError("no such storage index")
428 def bucket_writer_closed(self, bw, consumed_size):
429 if self.stats_provider:
430 self.stats_provider.count('storage_server.bytes_added', consumed_size)
431 del self._active_writers[bw]
433 def _get_bucket_shares(self, storage_index):
434 """Return a list of (shnum, pathname) tuples for files that hold
435 shares for this storage_index. In each tuple, 'shnum' will always be
436 the integer form of the last component of 'pathname'."""
437 storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
439 for f in os.listdir(storagedir):
441 filename = os.path.join(storagedir, f)
442 yield (int(f), filename)
444 # Commonly caused by there being no buckets at all.
447 def remote_get_buckets(self, storage_index):
450 si_s = si_b2a(storage_index)
451 log.msg("storage: get_buckets %s" % si_s)
452 bucketreaders = {} # k: sharenum, v: BucketReader
453 for shnum, filename in self._get_bucket_shares(storage_index):
454 bucketreaders[shnum] = BucketReader(self, filename,
455 storage_index, shnum)
456 self.add_latency("get", time.time() - start)
459 def get_leases(self, storage_index):
460 """Provide an iterator that yields all of the leases attached to this
461 bucket. Each lease is returned as a LeaseInfo instance.
463 This method is not for client use.
466 # since all shares get the same lease data, we just grab the leases
467 # from the first share
469 shnum, filename = self._get_bucket_shares(storage_index).next()
470 sf = ShareFile(filename)
471 return sf.get_leases()
472 except StopIteration:
475 def remote_slot_testv_and_readv_and_writev(self, storage_index,
477 test_and_write_vectors,
481 si_s = si_b2a(storage_index)
482 log.msg("storage: slot_writev %s" % si_s)
483 si_dir = storage_index_to_dir(storage_index)
484 (write_enabler, renew_secret, cancel_secret) = secrets
485 # shares exist if there is a file for them
486 bucketdir = os.path.join(self.sharedir, si_dir)
488 if os.path.isdir(bucketdir):
489 for sharenum_s in os.listdir(bucketdir):
491 sharenum = int(sharenum_s)
494 filename = os.path.join(bucketdir, sharenum_s)
495 msf = MutableShareFile(filename, self)
496 msf.check_write_enabler(write_enabler, si_s)
497 shares[sharenum] = msf
498 # write_enabler is good for all existing shares.
500 # Now evaluate test vectors.
502 for sharenum in test_and_write_vectors:
503 (testv, datav, new_length) = test_and_write_vectors[sharenum]
504 if sharenum in shares:
505 if not shares[sharenum].check_testv(testv):
506 self.log("testv failed: [%d]: %r" % (sharenum, testv))
507 testv_is_good = False
510 # compare the vectors against an empty share, in which all
511 # reads return empty strings.
512 if not EmptyShare().check_testv(testv):
513 self.log("testv failed (empty): [%d] %r" % (sharenum,
515 testv_is_good = False
518 # now gather the read vectors, before we do any writes
520 for sharenum, share in shares.items():
521 read_data[sharenum] = share.readv(read_vector)
524 expire_time = time.time() + 31*24*60*60 # one month
525 lease_info = LeaseInfo(ownerid,
526 renew_secret, cancel_secret,
527 expire_time, self.my_nodeid)
530 # now apply the write vectors
531 for sharenum in test_and_write_vectors:
532 (testv, datav, new_length) = test_and_write_vectors[sharenum]
534 if sharenum in shares:
535 shares[sharenum].unlink()
537 if sharenum not in shares:
538 # allocate a new share
539 allocated_size = 2000 # arbitrary, really
540 share = self._allocate_slot_share(bucketdir, secrets,
544 shares[sharenum] = share
545 shares[sharenum].writev(datav, new_length)
546 # and update the lease
547 shares[sharenum].add_or_renew_lease(lease_info)
550 # delete empty bucket directories
551 if not os.listdir(bucketdir):
556 self.add_latency("writev", time.time() - start)
557 return (testv_is_good, read_data)
559 def _allocate_slot_share(self, bucketdir, secrets, sharenum,
560 allocated_size, owner_num=0):
561 (write_enabler, renew_secret, cancel_secret) = secrets
562 my_nodeid = self.my_nodeid
563 fileutil.make_dirs(bucketdir)
564 filename = os.path.join(bucketdir, "%d" % sharenum)
565 share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
569 def remote_slot_readv(self, storage_index, shares, readv):
572 si_s = si_b2a(storage_index)
573 lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
574 facility="tahoe.storage", level=log.OPERATIONAL)
575 si_dir = storage_index_to_dir(storage_index)
576 # shares exist if there is a file for them
577 bucketdir = os.path.join(self.sharedir, si_dir)
578 if not os.path.isdir(bucketdir):
579 self.add_latency("readv", time.time() - start)
582 for sharenum_s in os.listdir(bucketdir):
584 sharenum = int(sharenum_s)
587 if sharenum in shares or not shares:
588 filename = os.path.join(bucketdir, sharenum_s)
589 msf = MutableShareFile(filename, self)
590 datavs[sharenum] = msf.readv(readv)
591 log.msg("returning shares %s" % (datavs.keys(),),
592 facility="tahoe.storage", level=log.NOISY, parent=lp)
593 self.add_latency("readv", time.time() - start)
596 def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
598 fileutil.make_dirs(self.corruption_advisory_dir)
599 now = time_format.iso_utc(sep="T")
600 si_s = si_b2a(storage_index)
601 # windows can't handle colons in the filename
602 fn = os.path.join(self.corruption_advisory_dir,
603 "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
605 f.write("report: Share Corruption\n")
606 f.write("type: %s\n" % share_type)
607 f.write("storage_index: %s\n" % si_s)
608 f.write("share_number: %d\n" % shnum)
613 log.msg(format=("client claims corruption in (%(share_type)s) " +
614 "%(si)s-%(shnum)d: %(reason)s"),
615 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
616 level=log.SCARY, umid="SGx2fA")