1 import os, re, weakref, struct, time
3 from foolscap.api import Referenceable
4 from twisted.application import service
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15 create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18 from allmydata.storage.expirer import LeaseCheckingCrawler
21 # storage/shares/incoming
22 # incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23 # be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24 # storage/shares/$START/$STORAGEINDEX
25 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
27 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
30 # $SHARENUM matches this regex:
31 NUM_RE=re.compile("^[0-9]+$")
35 class StorageServer(service.MultiService, Referenceable):
36 implements(RIStorageServer, IStatsProducer)
38 LeaseCheckerClass = LeaseCheckingCrawler
42 import win32api, win32con
44 # <http://msdn.microsoft.com/en-us/library/ms680621%28VS.85%29.aspx>
45 win32api.SetErrorMode(win32con.SEM_FAILCRITICALERRORS |
46 win32con.SEM_NOOPENFILEERRORBOX)
50 def __init__(self, storedir, nodeid, reserved_space=0,
51 discard_storage=False, readonly_storage=False,
53 expiration_enabled=False,
54 expiration_mode="age",
55 expiration_override_lease_duration=None,
56 expiration_cutoff_date=None,
57 expiration_sharetypes=("mutable", "immutable")):
58 service.MultiService.__init__(self)
59 assert isinstance(nodeid, str)
60 assert len(nodeid) == 20
61 self.my_nodeid = nodeid
62 self.storedir = storedir
63 sharedir = os.path.join(storedir, "shares")
64 fileutil.make_dirs(sharedir)
65 self.sharedir = sharedir
66 # we don't actually create the corruption-advisory dir until necessary
67 self.corruption_advisory_dir = os.path.join(storedir,
68 "corruption-advisories")
69 self.reserved_space = int(reserved_space)
70 self.no_storage = discard_storage
71 self.readonly_storage = readonly_storage
72 self.stats_provider = stats_provider
73 if self.stats_provider:
74 self.stats_provider.register_producer(self)
75 self.incomingdir = os.path.join(sharedir, 'incoming')
76 self._clean_incomplete()
77 fileutil.make_dirs(self.incomingdir)
78 self._active_writers = weakref.WeakKeyDictionary()
79 log.msg("StorageServer created", facility="tahoe.storage")
82 if self.get_available_space() is None:
83 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
84 umin="0wZ27w", level=log.UNUSUAL)
86 self.latencies = {"allocate": [], # immutable
91 "writev": [], # mutable
93 "add-lease": [], # both
97 self.add_bucket_counter()
99 statefile = os.path.join(self.storedir, "lease_checker.state")
100 historyfile = os.path.join(self.storedir, "lease_checker.history")
101 klass = self.LeaseCheckerClass
102 self.lease_checker = klass(self, statefile, historyfile,
103 expiration_enabled, expiration_mode,
104 expiration_override_lease_duration,
105 expiration_cutoff_date,
106 expiration_sharetypes)
107 self.lease_checker.setServiceParent(self)
110 return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
112 def add_bucket_counter(self):
113 statefile = os.path.join(self.storedir, "bucket_counter.state")
114 self.bucket_counter = BucketCountingCrawler(self, statefile)
115 self.bucket_counter.setServiceParent(self)
117 def count(self, name, delta=1):
118 if self.stats_provider:
119 self.stats_provider.count("storage_server." + name, delta)
121 def add_latency(self, category, latency):
122 a = self.latencies[category]
125 self.latencies[category] = a[-1000:]
127 def get_latencies(self):
128 """Return a dict, indexed by category, that contains a dict of
129 latency numbers for each category. Each dict will contain the
130 following keys: mean, 01_0_percentile, 10_0_percentile,
131 50_0_percentile (median), 90_0_percentile, 95_0_percentile,
132 99_0_percentile, 99_9_percentile. If no samples have been collected
133 for the given category, then that category name will not be present
134 in the return value."""
135 # note that Amazon's Dynamo paper says they use 99.9% percentile.
137 for category in self.latencies:
138 if not self.latencies[category]:
141 samples = self.latencies[category][:]
144 stats["mean"] = sum(samples) / count
145 stats["01_0_percentile"] = samples[int(0.01 * count)]
146 stats["10_0_percentile"] = samples[int(0.1 * count)]
147 stats["50_0_percentile"] = samples[int(0.5 * count)]
148 stats["90_0_percentile"] = samples[int(0.9 * count)]
149 stats["95_0_percentile"] = samples[int(0.95 * count)]
150 stats["99_0_percentile"] = samples[int(0.99 * count)]
151 stats["99_9_percentile"] = samples[int(0.999 * count)]
152 output[category] = stats
155 def log(self, *args, **kwargs):
156 if "facility" not in kwargs:
157 kwargs["facility"] = "tahoe.storage"
158 return log.msg(*args, **kwargs)
160 def _clean_incomplete(self):
161 fileutil.rm_dir(self.incomingdir)
163 def get_disk_stats(self):
164 """Return disk statistics for the storage disk, in the form of a dict
165 with the following fields.
166 total: total bytes on disk
167 free_for_root: bytes actually free on disk
168 free_for_nonroot: bytes free for "a non-privileged user" [Unix] or
169 the current user [Windows]; might take into
170 account quotas depending on platform
171 used: bytes used on disk
172 avail: bytes available excluding reserved space
173 An AttributeError can occur if the OS has no API to get disk information.
174 An EnvironmentError can occur if the OS call fails."""
177 # For Windows systems, where os.statvfs is not available, use GetDiskFreeSpaceEx.
178 # <http://docs.activestate.com/activepython/2.5/pywin32/win32api__GetDiskFreeSpaceEx_meth.html>
180 # Although the docs say that the argument should be the root directory
181 # of a disk, GetDiskFreeSpaceEx actually accepts any path on that disk
182 # (like its Win32 equivalent).
184 (free_for_nonroot, total, free_for_root) = self.win32api.GetDiskFreeSpaceEx(self.storedir)
186 # For Unix-like systems.
187 # <http://docs.python.org/library/os.html#os.statvfs>
188 # <http://opengroup.org/onlinepubs/7990989799/xsh/fstatvfs.html>
189 # <http://opengroup.org/onlinepubs/7990989799/xsh/sysstatvfs.h.html>
190 s = os.statvfs(self.storedir)
193 # statvfs(2) is a wrapper around statfs(2).
194 # statvfs.f_frsize = statfs.f_bsize :
195 # "minimum unit of allocation" (statvfs)
196 # "fundamental file system block size" (statfs)
197 # statvfs.f_bsize = statfs.f_iosize = stat.st_blocks : preferred IO size
198 # on an encrypted home directory ("FileVault"), it gets f_blocks
199 # wrong, and s.f_blocks*s.f_frsize is twice the size of my disk,
200 # but s.f_bavail*s.f_frsize is correct
202 total = s.f_frsize * s.f_blocks
203 free_for_root = s.f_frsize * s.f_bfree
204 free_for_nonroot = s.f_frsize * s.f_bavail
206 # valid for all platforms:
207 used = total - free_for_root
208 avail = max(free_for_nonroot - self.reserved_space, 0)
210 return { 'total': total, 'free_for_root': free_for_root,
211 'free_for_nonroot': free_for_nonroot,
212 'used': used, 'avail': avail, }
215 # remember: RIStatsProvider requires that our return dict
216 # contains numeric values.
217 stats = { 'storage_server.allocated': self.allocated_size(), }
218 stats['storage_server.reserved_space'] = self.reserved_space
219 for category,ld in self.get_latencies().items():
220 for name,v in ld.items():
221 stats['storage_server.latencies.%s.%s' % (category, name)] = v
224 disk = self.get_disk_stats()
225 writeable = disk['avail'] > 0
227 # spacetime predictors should use disk_avail / (d(disk_used)/dt)
228 stats['storage_server.disk_total'] = disk['total']
229 stats['storage_server.disk_used'] = disk['used']
230 stats['storage_server.disk_free_for_root'] = disk['free_for_root']
231 stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
232 stats['storage_server.disk_avail'] = disk['avail']
233 except AttributeError:
235 except EnvironmentError:
236 log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
239 if self.readonly_storage:
240 stats['storage_server.disk_avail'] = 0
243 stats['storage_server.accepting_immutable_shares'] = int(writeable)
244 s = self.bucket_counter.get_state()
245 bucket_count = s.get("last-complete-bucket-count")
247 stats['storage_server.total_bucket_count'] = bucket_count
250 def get_available_space(self):
251 """Returns available space for share storage in bytes, or None if no
252 API to get this information is available."""
254 if self.readonly_storage:
257 return self.get_disk_stats()['avail']
258 except AttributeError:
260 except EnvironmentError:
261 log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
264 def allocated_size(self):
266 for bw in self._active_writers:
267 space += bw.allocated_size()
270 def remote_get_version(self):
271 remaining_space = self.get_available_space()
272 if remaining_space is None:
273 # We're on a platform that has no API to get disk stats.
274 remaining_space = 2**64
276 version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
277 { "maximum-immutable-share-size": remaining_space,
278 "tolerates-immutable-read-overrun": True,
279 "delete-mutable-shares-with-zero-length-writev": True,
281 "application-version": str(allmydata.__full_version__),
285 def remote_allocate_buckets(self, storage_index,
286 renew_secret, cancel_secret,
287 sharenums, allocated_size,
288 canary, owner_num=0):
289 # owner_num is not for clients to set, but rather it should be
290 # curried into the PersonalStorageServer instance that is dedicated
291 # to a particular owner.
293 self.count("allocate")
295 bucketwriters = {} # k: shnum, v: BucketWriter
296 si_dir = storage_index_to_dir(storage_index)
297 si_s = si_b2a(storage_index)
299 log.msg("storage: allocate_buckets %s" % si_s)
301 # in this implementation, the lease information (including secrets)
302 # goes into the share files themselves. It could also be put into a
303 # separate database. Note that the lease should not be added until
304 # the BucketWriter has been closed.
305 expire_time = time.time() + 31*24*60*60
306 lease_info = LeaseInfo(owner_num,
307 renew_secret, cancel_secret,
308 expire_time, self.my_nodeid)
310 max_space_per_bucket = allocated_size
312 remaining_space = self.get_available_space()
313 limited = remaining_space is not None
315 # this is a bit conservative, since some of this allocated_size()
316 # has already been written to disk, where it will show up in
317 # get_available_space.
318 remaining_space -= self.allocated_size()
319 # self.readonly_storage causes remaining_space <= 0
321 # fill alreadygot with all shares that we have, not just the ones
322 # they asked about: this will save them a lot of work. Add or update
323 # leases for all of them: if they want us to hold shares for this
324 # file, they'll want us to hold leases for this file.
325 for (shnum, fn) in self._get_bucket_shares(storage_index):
326 alreadygot.add(shnum)
328 sf.add_or_renew_lease(lease_info)
330 for shnum in sharenums:
331 incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
332 finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
333 if os.path.exists(finalhome):
334 # great! we already have it. easy.
336 elif os.path.exists(incominghome):
337 # Note that we don't create BucketWriters for shnums that
338 # have a partial share (in incoming/), so if a second upload
339 # occurs while the first is still in progress, the second
340 # uploader will use different storage servers.
342 elif (not limited) or (remaining_space >= max_space_per_bucket):
343 # ok! we need to create the new share file.
344 bw = BucketWriter(self, incominghome, finalhome,
345 max_space_per_bucket, lease_info, canary)
347 bw.throw_out_all_data = True
348 bucketwriters[shnum] = bw
349 self._active_writers[bw] = 1
351 remaining_space -= max_space_per_bucket
353 # bummer! not enough space to accept this bucket
357 fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
359 self.add_latency("allocate", time.time() - start)
360 return alreadygot, bucketwriters
362 def _iter_share_files(self, storage_index):
363 for shnum, filename in self._get_bucket_shares(storage_index):
364 f = open(filename, 'rb')
367 if header[:32] == MutableShareFile.MAGIC:
368 sf = MutableShareFile(filename, self)
369 # note: if the share has been migrated, the renew_lease()
370 # call will throw an exception, with information to help the
371 # client update the lease.
372 elif header[:4] == struct.pack(">L", 1):
373 sf = ShareFile(filename)
375 continue # non-sharefile
378 def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
381 self.count("add-lease")
382 new_expire_time = time.time() + 31*24*60*60
383 lease_info = LeaseInfo(owner_num,
384 renew_secret, cancel_secret,
385 new_expire_time, self.my_nodeid)
386 for sf in self._iter_share_files(storage_index):
387 sf.add_or_renew_lease(lease_info)
388 self.add_latency("add-lease", time.time() - start)
391 def remote_renew_lease(self, storage_index, renew_secret):
394 new_expire_time = time.time() + 31*24*60*60
395 found_buckets = False
396 for sf in self._iter_share_files(storage_index):
398 sf.renew_lease(renew_secret, new_expire_time)
399 self.add_latency("renew", time.time() - start)
400 if not found_buckets:
401 raise IndexError("no such lease to renew")
403 def remote_cancel_lease(self, storage_index, cancel_secret):
407 total_space_freed = 0
408 found_buckets = False
409 for sf in self._iter_share_files(storage_index):
410 # note: if we can't find a lease on one share, we won't bother
411 # looking in the others. Unless something broke internally
412 # (perhaps we ran out of disk space while adding a lease), the
413 # leases on all shares will be identical.
415 # this raises IndexError if the lease wasn't present XXXX
416 total_space_freed += sf.cancel_lease(cancel_secret)
419 storagedir = os.path.join(self.sharedir,
420 storage_index_to_dir(storage_index))
421 if not os.listdir(storagedir):
424 if self.stats_provider:
425 self.stats_provider.count('storage_server.bytes_freed',
427 self.add_latency("cancel", time.time() - start)
428 if not found_buckets:
429 raise IndexError("no such storage index")
431 def bucket_writer_closed(self, bw, consumed_size):
432 if self.stats_provider:
433 self.stats_provider.count('storage_server.bytes_added', consumed_size)
434 del self._active_writers[bw]
436 def _get_bucket_shares(self, storage_index):
437 """Return a list of (shnum, pathname) tuples for files that hold
438 shares for this storage_index. In each tuple, 'shnum' will always be
439 the integer form of the last component of 'pathname'."""
440 storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
442 for f in os.listdir(storagedir):
444 filename = os.path.join(storagedir, f)
445 yield (int(f), filename)
447 # Commonly caused by there being no buckets at all.
450 def remote_get_buckets(self, storage_index):
453 si_s = si_b2a(storage_index)
454 log.msg("storage: get_buckets %s" % si_s)
455 bucketreaders = {} # k: sharenum, v: BucketReader
456 for shnum, filename in self._get_bucket_shares(storage_index):
457 bucketreaders[shnum] = BucketReader(self, filename,
458 storage_index, shnum)
459 self.add_latency("get", time.time() - start)
462 def get_leases(self, storage_index):
463 """Provide an iterator that yields all of the leases attached to this
464 bucket. Each lease is returned as a LeaseInfo instance.
466 This method is not for client use.
469 # since all shares get the same lease data, we just grab the leases
470 # from the first share
472 shnum, filename = self._get_bucket_shares(storage_index).next()
473 sf = ShareFile(filename)
474 return sf.get_leases()
475 except StopIteration:
478 def remote_slot_testv_and_readv_and_writev(self, storage_index,
480 test_and_write_vectors,
484 si_s = si_b2a(storage_index)
485 log.msg("storage: slot_writev %s" % si_s)
486 si_dir = storage_index_to_dir(storage_index)
487 (write_enabler, renew_secret, cancel_secret) = secrets
488 # shares exist if there is a file for them
489 bucketdir = os.path.join(self.sharedir, si_dir)
491 if os.path.isdir(bucketdir):
492 for sharenum_s in os.listdir(bucketdir):
494 sharenum = int(sharenum_s)
497 filename = os.path.join(bucketdir, sharenum_s)
498 msf = MutableShareFile(filename, self)
499 msf.check_write_enabler(write_enabler, si_s)
500 shares[sharenum] = msf
501 # write_enabler is good for all existing shares.
503 # Now evaluate test vectors.
505 for sharenum in test_and_write_vectors:
506 (testv, datav, new_length) = test_and_write_vectors[sharenum]
507 if sharenum in shares:
508 if not shares[sharenum].check_testv(testv):
509 self.log("testv failed: [%d]: %r" % (sharenum, testv))
510 testv_is_good = False
513 # compare the vectors against an empty share, in which all
514 # reads return empty strings.
515 if not EmptyShare().check_testv(testv):
516 self.log("testv failed (empty): [%d] %r" % (sharenum,
518 testv_is_good = False
521 # now gather the read vectors, before we do any writes
523 for sharenum, share in shares.items():
524 read_data[sharenum] = share.readv(read_vector)
527 expire_time = time.time() + 31*24*60*60 # one month
528 lease_info = LeaseInfo(ownerid,
529 renew_secret, cancel_secret,
530 expire_time, self.my_nodeid)
533 # now apply the write vectors
534 for sharenum in test_and_write_vectors:
535 (testv, datav, new_length) = test_and_write_vectors[sharenum]
537 if sharenum in shares:
538 shares[sharenum].unlink()
540 if sharenum not in shares:
541 # allocate a new share
542 allocated_size = 2000 # arbitrary, really
543 share = self._allocate_slot_share(bucketdir, secrets,
547 shares[sharenum] = share
548 shares[sharenum].writev(datav, new_length)
549 # and update the lease
550 shares[sharenum].add_or_renew_lease(lease_info)
553 # delete empty bucket directories
554 if not os.listdir(bucketdir):
559 self.add_latency("writev", time.time() - start)
560 return (testv_is_good, read_data)
562 def _allocate_slot_share(self, bucketdir, secrets, sharenum,
563 allocated_size, owner_num=0):
564 (write_enabler, renew_secret, cancel_secret) = secrets
565 my_nodeid = self.my_nodeid
566 fileutil.make_dirs(bucketdir)
567 filename = os.path.join(bucketdir, "%d" % sharenum)
568 share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
572 def remote_slot_readv(self, storage_index, shares, readv):
575 si_s = si_b2a(storage_index)
576 lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
577 facility="tahoe.storage", level=log.OPERATIONAL)
578 si_dir = storage_index_to_dir(storage_index)
579 # shares exist if there is a file for them
580 bucketdir = os.path.join(self.sharedir, si_dir)
581 if not os.path.isdir(bucketdir):
582 self.add_latency("readv", time.time() - start)
585 for sharenum_s in os.listdir(bucketdir):
587 sharenum = int(sharenum_s)
590 if sharenum in shares or not shares:
591 filename = os.path.join(bucketdir, sharenum_s)
592 msf = MutableShareFile(filename, self)
593 datavs[sharenum] = msf.readv(readv)
594 log.msg("returning shares %s" % (datavs.keys(),),
595 facility="tahoe.storage", level=log.NOISY, parent=lp)
596 self.add_latency("readv", time.time() - start)
599 def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
601 fileutil.make_dirs(self.corruption_advisory_dir)
602 now = time_format.iso_utc(sep="T")
603 si_s = si_b2a(storage_index)
604 # windows can't handle colons in the filename
605 fn = os.path.join(self.corruption_advisory_dir,
606 "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
608 f.write("report: Share Corruption\n")
609 f.write("type: %s\n" % share_type)
610 f.write("storage_index: %s\n" % si_s)
611 f.write("share_number: %d\n" % shnum)
616 log.msg(format=("client claims corruption in (%(share_type)s) " +
617 "%(si)s-%(shnum)d: %(reason)s"),
618 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
619 level=log.SCARY, umid="SGx2fA")