1 import os, re, weakref, struct, time
3 from foolscap import Referenceable
4 from twisted.application import service
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, log, time_format
9 import allmydata # for __full_version__
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15 create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18 from allmydata.storage.expirer import LeaseCheckingCrawler
21 # storage/shares/incoming
22 # incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23 # be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24 # storage/shares/$START/$STORAGEINDEX
25 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
27 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
30 # $SHARENUM matches this regex:
31 NUM_RE=re.compile("^[0-9]+$")
35 class StorageServer(service.MultiService, Referenceable):
36 implements(RIStorageServer, IStatsProducer)
38 LeaseCheckerClass = LeaseCheckingCrawler
40 def __init__(self, storedir, nodeid, reserved_space=0,
41 discard_storage=False, readonly_storage=False,
43 expiration_enabled=False,
44 expiration_mode=("age", 31*24*60*60)):
45 service.MultiService.__init__(self)
46 assert isinstance(nodeid, str)
47 assert len(nodeid) == 20
48 self.my_nodeid = nodeid
49 self.storedir = storedir
50 sharedir = os.path.join(storedir, "shares")
51 fileutil.make_dirs(sharedir)
52 self.sharedir = sharedir
53 # we don't actually create the corruption-advisory dir until necessary
54 self.corruption_advisory_dir = os.path.join(storedir,
55 "corruption-advisories")
56 self.reserved_space = int(reserved_space)
57 self.no_storage = discard_storage
58 self.readonly_storage = readonly_storage
59 self.stats_provider = stats_provider
60 if self.stats_provider:
61 self.stats_provider.register_producer(self)
62 self.incomingdir = os.path.join(sharedir, 'incoming')
63 self._clean_incomplete()
64 fileutil.make_dirs(self.incomingdir)
65 self._active_writers = weakref.WeakKeyDictionary()
66 lp = log.msg("StorageServer created", facility="tahoe.storage")
69 if self.get_available_space() is None:
70 log.msg("warning: [storage]reserved_space= is set, but this platform does not support statvfs(2), so this reservation cannot be honored",
71 umin="0wZ27w", level=log.UNUSUAL)
73 self.latencies = {"allocate": [], # immutable
78 "writev": [], # mutable
80 "add-lease": [], # both
84 self.add_bucket_counter()
85 self.add_lease_checker(expiration_enabled, expiration_mode)
87 def add_bucket_counter(self):
88 statefile = os.path.join(self.storedir, "bucket_counter.state")
89 self.bucket_counter = BucketCountingCrawler(self, statefile)
90 self.bucket_counter.setServiceParent(self)
92 def add_lease_checker(self, expiration_enabled, expiration_mode):
93 statefile = os.path.join(self.storedir, "lease_checker.state")
94 historyfile = os.path.join(self.storedir, "lease_checker.history")
95 klass = self.LeaseCheckerClass
96 self.lease_checker = klass(self, statefile, historyfile,
97 expiration_enabled=expiration_enabled,
98 expiration_mode=expiration_mode)
99 self.lease_checker.setServiceParent(self)
101 def count(self, name, delta=1):
102 if self.stats_provider:
103 self.stats_provider.count("storage_server." + name, delta)
105 def add_latency(self, category, latency):
106 a = self.latencies[category]
109 self.latencies[category] = a[-1000:]
111 def get_latencies(self):
112 """Return a dict, indexed by category, that contains a dict of
113 latency numbers for each category. Each dict will contain the
114 following keys: mean, 01_0_percentile, 10_0_percentile,
115 50_0_percentile (median), 90_0_percentile, 95_0_percentile,
116 99_0_percentile, 99_9_percentile. If no samples have been collected
117 for the given category, then that category name will not be present
118 in the return value."""
119 # note that Amazon's Dynamo paper says they use 99.9% percentile.
121 for category in self.latencies:
122 if not self.latencies[category]:
125 samples = self.latencies[category][:]
128 stats["mean"] = sum(samples) / count
129 stats["01_0_percentile"] = samples[int(0.01 * count)]
130 stats["10_0_percentile"] = samples[int(0.1 * count)]
131 stats["50_0_percentile"] = samples[int(0.5 * count)]
132 stats["90_0_percentile"] = samples[int(0.9 * count)]
133 stats["95_0_percentile"] = samples[int(0.95 * count)]
134 stats["99_0_percentile"] = samples[int(0.99 * count)]
135 stats["99_9_percentile"] = samples[int(0.999 * count)]
136 output[category] = stats
139 def log(self, *args, **kwargs):
140 if "facility" not in kwargs:
141 kwargs["facility"] = "tahoe.storage"
142 return log.msg(*args, **kwargs)
144 def _clean_incomplete(self):
145 fileutil.rm_dir(self.incomingdir)
147 def do_statvfs(self):
148 return os.statvfs(self.storedir)
151 # remember: RIStatsProvider requires that our return dict
152 # contains numeric values.
153 stats = { 'storage_server.allocated': self.allocated_size(), }
154 stats["storage_server.reserved_space"] = self.reserved_space
155 for category,ld in self.get_latencies().items():
156 for name,v in ld.items():
157 stats['storage_server.latencies.%s.%s' % (category, name)] = v
159 if self.readonly_storage:
162 s = self.do_statvfs()
164 # statvfs(2) is a wrapper around statfs(2).
165 # statvfs.f_frsize = statfs.f_bsize :
166 # "minimum unit of allocation" (statvfs)
167 # "fundamental file system block size" (statfs)
168 # statvfs.f_bsize = statfs.f_iosize = stat.st_blocks : preferred IO size
169 # on an encrypted home directory ("FileVault"), it gets f_blocks
170 # wrong, and s.f_blocks*s.f_frsize is twice the size of my disk,
171 # but s.f_bavail*s.f_frsize is correct
173 disk_total = s.f_frsize * s.f_blocks
174 disk_used = s.f_frsize * (s.f_blocks - s.f_bfree)
175 # spacetime predictors should look at the slope of disk_used.
176 disk_free_for_root = s.f_frsize * s.f_bfree
177 disk_free_for_nonroot = s.f_frsize * s.f_bavail
179 # include our local policy here: if we stop accepting shares when
180 # the available space drops below 1GB, then include that fact in
182 disk_avail = disk_free_for_nonroot - self.reserved_space
183 disk_avail = max(disk_avail, 0)
184 if self.readonly_storage:
189 # spacetime predictors should use disk_avail / (d(disk_used)/dt)
190 stats["storage_server.disk_total"] = disk_total
191 stats["storage_server.disk_used"] = disk_used
192 stats["storage_server.disk_free_for_root"] = disk_free_for_root
193 stats["storage_server.disk_free_for_nonroot"] = disk_free_for_nonroot
194 stats["storage_server.disk_avail"] = disk_avail
195 except AttributeError:
196 # os.statvfs is available only on unix
198 stats["storage_server.accepting_immutable_shares"] = int(writeable)
199 s = self.bucket_counter.get_state()
200 bucket_count = s.get("last-complete-bucket-count")
202 stats["storage_server.total_bucket_count"] = bucket_count
206 def stat_disk(self, d):
208 # s.f_bavail: available to non-root users
209 disk_avail = s.f_frsize * s.f_bavail
212 def get_available_space(self):
213 # returns None if it cannot be measured (windows)
215 disk_avail = self.stat_disk(self.storedir)
216 disk_avail -= self.reserved_space
217 except AttributeError:
219 if self.readonly_storage:
223 def allocated_size(self):
225 for bw in self._active_writers:
226 space += bw.allocated_size()
229 def remote_get_version(self):
230 remaining_space = self.get_available_space()
231 if remaining_space is None:
232 # we're on a platform that doesn't have 'df', so make a vague
234 remaining_space = 2**64
235 version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
236 { "maximum-immutable-share-size": remaining_space,
237 "tolerates-immutable-read-overrun": True,
238 "delete-mutable-shares-with-zero-length-writev": True,
240 "application-version": str(allmydata.__full_version__),
244 def remote_allocate_buckets(self, storage_index,
245 renew_secret, cancel_secret,
246 sharenums, allocated_size,
247 canary, owner_num=0):
248 # owner_num is not for clients to set, but rather it should be
249 # curried into the PersonalStorageServer instance that is dedicated
250 # to a particular owner.
252 self.count("allocate")
254 bucketwriters = {} # k: shnum, v: BucketWriter
255 si_dir = storage_index_to_dir(storage_index)
256 si_s = si_b2a(storage_index)
258 log.msg("storage: allocate_buckets %s" % si_s)
260 # in this implementation, the lease information (including secrets)
261 # goes into the share files themselves. It could also be put into a
262 # separate database. Note that the lease should not be added until
263 # the BucketWriter has been closed.
264 expire_time = time.time() + 31*24*60*60
265 lease_info = LeaseInfo(owner_num,
266 renew_secret, cancel_secret,
267 expire_time, self.my_nodeid)
269 max_space_per_bucket = allocated_size
271 remaining_space = self.get_available_space()
272 limited = remaining_space is not None
274 # this is a bit conservative, since some of this allocated_size()
275 # has already been written to disk, where it will show up in
276 # get_available_space.
277 remaining_space -= self.allocated_size()
279 # fill alreadygot with all shares that we have, not just the ones
280 # they asked about: this will save them a lot of work. Add or update
281 # leases for all of them: if they want us to hold shares for this
282 # file, they'll want us to hold leases for this file.
283 for (shnum, fn) in self._get_bucket_shares(storage_index):
284 alreadygot.add(shnum)
286 sf.add_or_renew_lease(lease_info)
288 # self.readonly_storage causes remaining_space=0
290 for shnum in sharenums:
291 incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
292 finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
293 if os.path.exists(finalhome):
294 # great! we already have it. easy.
296 elif os.path.exists(incominghome):
297 # Note that we don't create BucketWriters for shnums that
298 # have a partial share (in incoming/), so if a second upload
299 # occurs while the first is still in progress, the second
300 # uploader will use different storage servers.
302 elif (not limited) or (remaining_space >= max_space_per_bucket):
303 # ok! we need to create the new share file.
304 bw = BucketWriter(self, incominghome, finalhome,
305 max_space_per_bucket, lease_info, canary)
307 bw.throw_out_all_data = True
308 bucketwriters[shnum] = bw
309 self._active_writers[bw] = 1
311 remaining_space -= max_space_per_bucket
313 # bummer! not enough space to accept this bucket
317 fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
319 self.add_latency("allocate", time.time() - start)
320 return alreadygot, bucketwriters
322 def _iter_share_files(self, storage_index):
323 for shnum, filename in self._get_bucket_shares(storage_index):
324 f = open(filename, 'rb')
327 if header[:32] == MutableShareFile.MAGIC:
328 sf = MutableShareFile(filename, self)
329 # note: if the share has been migrated, the renew_lease()
330 # call will throw an exception, with information to help the
331 # client update the lease.
332 elif header[:4] == struct.pack(">L", 1):
333 sf = ShareFile(filename)
335 continue # non-sharefile
338 def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
341 self.count("add-lease")
342 new_expire_time = time.time() + 31*24*60*60
343 lease_info = LeaseInfo(owner_num,
344 renew_secret, cancel_secret,
345 new_expire_time, self.my_nodeid)
346 for sf in self._iter_share_files(storage_index):
347 sf.add_or_renew_lease(lease_info)
348 self.add_latency("add-lease", time.time() - start)
351 def remote_renew_lease(self, storage_index, renew_secret):
354 new_expire_time = time.time() + 31*24*60*60
355 found_buckets = False
356 for sf in self._iter_share_files(storage_index):
358 sf.renew_lease(renew_secret, new_expire_time)
359 self.add_latency("renew", time.time() - start)
360 if not found_buckets:
361 raise IndexError("no such lease to renew")
363 def remote_cancel_lease(self, storage_index, cancel_secret):
367 total_space_freed = 0
368 found_buckets = False
369 for sf in self._iter_share_files(storage_index):
370 # note: if we can't find a lease on one share, we won't bother
371 # looking in the others. Unless something broke internally
372 # (perhaps we ran out of disk space while adding a lease), the
373 # leases on all shares will be identical.
375 # this raises IndexError if the lease wasn't present XXXX
376 total_space_freed += sf.cancel_lease(cancel_secret)
379 storagedir = os.path.join(self.sharedir,
380 storage_index_to_dir(storage_index))
381 if not os.listdir(storagedir):
384 if self.stats_provider:
385 self.stats_provider.count('storage_server.bytes_freed',
387 self.add_latency("cancel", time.time() - start)
388 if not found_buckets:
389 raise IndexError("no such storage index")
391 def bucket_writer_closed(self, bw, consumed_size):
392 if self.stats_provider:
393 self.stats_provider.count('storage_server.bytes_added', consumed_size)
394 del self._active_writers[bw]
396 def _get_bucket_shares(self, storage_index):
397 """Return a list of (shnum, pathname) tuples for files that hold
398 shares for this storage_index. In each tuple, 'shnum' will always be
399 the integer form of the last component of 'pathname'."""
400 storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
402 for f in os.listdir(storagedir):
404 filename = os.path.join(storagedir, f)
405 yield (int(f), filename)
407 # Commonly caused by there being no buckets at all.
410 def remote_get_buckets(self, storage_index):
413 si_s = si_b2a(storage_index)
414 log.msg("storage: get_buckets %s" % si_s)
415 bucketreaders = {} # k: sharenum, v: BucketReader
416 for shnum, filename in self._get_bucket_shares(storage_index):
417 bucketreaders[shnum] = BucketReader(self, filename,
418 storage_index, shnum)
419 self.add_latency("get", time.time() - start)
422 def get_leases(self, storage_index):
423 """Provide an iterator that yields all of the leases attached to this
424 bucket. Each lease is returned as a LeaseInfo instance.
426 This method is not for client use.
429 # since all shares get the same lease data, we just grab the leases
430 # from the first share
432 shnum, filename = self._get_bucket_shares(storage_index).next()
433 sf = ShareFile(filename)
434 return sf.get_leases()
435 except StopIteration:
438 def remote_slot_testv_and_readv_and_writev(self, storage_index,
440 test_and_write_vectors,
444 si_s = si_b2a(storage_index)
445 lp = log.msg("storage: slot_writev %s" % si_s)
446 si_dir = storage_index_to_dir(storage_index)
447 (write_enabler, renew_secret, cancel_secret) = secrets
448 # shares exist if there is a file for them
449 bucketdir = os.path.join(self.sharedir, si_dir)
451 if os.path.isdir(bucketdir):
452 for sharenum_s in os.listdir(bucketdir):
454 sharenum = int(sharenum_s)
457 filename = os.path.join(bucketdir, sharenum_s)
458 msf = MutableShareFile(filename, self)
459 msf.check_write_enabler(write_enabler, si_s)
460 shares[sharenum] = msf
461 # write_enabler is good for all existing shares.
463 # Now evaluate test vectors.
465 for sharenum in test_and_write_vectors:
466 (testv, datav, new_length) = test_and_write_vectors[sharenum]
467 if sharenum in shares:
468 if not shares[sharenum].check_testv(testv):
469 self.log("testv failed: [%d]: %r" % (sharenum, testv))
470 testv_is_good = False
473 # compare the vectors against an empty share, in which all
474 # reads return empty strings.
475 if not EmptyShare().check_testv(testv):
476 self.log("testv failed (empty): [%d] %r" % (sharenum,
478 testv_is_good = False
481 # now gather the read vectors, before we do any writes
483 for sharenum, share in shares.items():
484 read_data[sharenum] = share.readv(read_vector)
487 expire_time = time.time() + 31*24*60*60 # one month
488 lease_info = LeaseInfo(ownerid,
489 renew_secret, cancel_secret,
490 expire_time, self.my_nodeid)
493 # now apply the write vectors
494 for sharenum in test_and_write_vectors:
495 (testv, datav, new_length) = test_and_write_vectors[sharenum]
497 if sharenum in shares:
498 shares[sharenum].unlink()
500 if sharenum not in shares:
501 # allocate a new share
502 allocated_size = 2000 # arbitrary, really
503 share = self._allocate_slot_share(bucketdir, secrets,
507 shares[sharenum] = share
508 shares[sharenum].writev(datav, new_length)
509 # and update the lease
510 shares[sharenum].add_or_renew_lease(lease_info)
513 # delete empty bucket directories
514 if not os.listdir(bucketdir):
519 self.add_latency("writev", time.time() - start)
520 return (testv_is_good, read_data)
522 def _allocate_slot_share(self, bucketdir, secrets, sharenum,
523 allocated_size, owner_num=0):
524 (write_enabler, renew_secret, cancel_secret) = secrets
525 my_nodeid = self.my_nodeid
526 fileutil.make_dirs(bucketdir)
527 filename = os.path.join(bucketdir, "%d" % sharenum)
528 share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
532 def remote_slot_readv(self, storage_index, shares, readv):
535 si_s = si_b2a(storage_index)
536 lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
537 facility="tahoe.storage", level=log.OPERATIONAL)
538 si_dir = storage_index_to_dir(storage_index)
539 # shares exist if there is a file for them
540 bucketdir = os.path.join(self.sharedir, si_dir)
541 if not os.path.isdir(bucketdir):
542 self.add_latency("readv", time.time() - start)
545 for sharenum_s in os.listdir(bucketdir):
547 sharenum = int(sharenum_s)
550 if sharenum in shares or not shares:
551 filename = os.path.join(bucketdir, sharenum_s)
552 msf = MutableShareFile(filename, self)
553 datavs[sharenum] = msf.readv(readv)
554 log.msg("returning shares %s" % (datavs.keys(),),
555 facility="tahoe.storage", level=log.NOISY, parent=lp)
556 self.add_latency("readv", time.time() - start)
559 def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
561 fileutil.make_dirs(self.corruption_advisory_dir)
562 now = time_format.iso_utc(sep="T")
563 si_s = si_b2a(storage_index)
564 # windows can't handle colons in the filename
565 fn = os.path.join(self.corruption_advisory_dir,
566 "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
568 f.write("report: Share Corruption\n")
569 f.write("type: %s\n" % share_type)
570 f.write("storage_index: %s\n" % si_s)
571 f.write("share_number: %d\n" % shnum)
576 log.msg(format=("client claims corruption in (%(share_type)s) " +
577 "%(si)s-%(shnum)d: %(reason)s"),
578 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
579 level=log.SCARY, umid="SGx2fA")