1 import os, re, weakref, struct, time
3 from foolscap.api import Referenceable
4 from twisted.application import service
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15 create_mutable_sharefile
16 from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
17 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
22 # storage/shares/incoming
23 # incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
24 # be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
25 # storage/shares/$START/$STORAGEINDEX
26 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
28 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
31 # $SHARENUM matches this regex:
32 NUM_RE=re.compile("^[0-9]+$")
36 class StorageServer(service.MultiService, Referenceable):
37 implements(RIStorageServer, IStatsProducer)
39 LeaseCheckerClass = LeaseCheckingCrawler
41 def __init__(self, storedir, nodeid, reserved_space=0,
42 discard_storage=False, readonly_storage=False,
44 expiration_enabled=False,
45 expiration_mode="age",
46 expiration_override_lease_duration=None,
47 expiration_cutoff_date=None,
48 expiration_sharetypes=("mutable", "immutable")):
49 service.MultiService.__init__(self)
50 assert isinstance(nodeid, str)
51 assert len(nodeid) == 20
52 self.my_nodeid = nodeid
53 self.storedir = storedir
54 sharedir = os.path.join(storedir, "shares")
55 fileutil.make_dirs(sharedir)
56 self.sharedir = sharedir
57 # we don't actually create the corruption-advisory dir until necessary
58 self.corruption_advisory_dir = os.path.join(storedir,
59 "corruption-advisories")
60 self.reserved_space = int(reserved_space)
61 self.no_storage = discard_storage
62 self.readonly_storage = readonly_storage
63 self.stats_provider = stats_provider
64 if self.stats_provider:
65 self.stats_provider.register_producer(self)
66 self.incomingdir = os.path.join(sharedir, 'incoming')
67 self._clean_incomplete()
68 fileutil.make_dirs(self.incomingdir)
69 self._active_writers = weakref.WeakKeyDictionary()
70 log.msg("StorageServer created", facility="tahoe.storage")
73 if self.get_available_space() is None:
74 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
75 umin="0wZ27w", level=log.UNUSUAL)
77 self.latencies = {"allocate": [], # immutable
82 "writev": [], # mutable
84 "add-lease": [], # both
88 self.add_bucket_counter()
90 statefile = os.path.join(self.storedir, "lease_checker.state")
91 historyfile = os.path.join(self.storedir, "lease_checker.history")
92 klass = self.LeaseCheckerClass
93 self.lease_checker = klass(self, statefile, historyfile,
94 expiration_enabled, expiration_mode,
95 expiration_override_lease_duration,
96 expiration_cutoff_date,
97 expiration_sharetypes)
98 self.lease_checker.setServiceParent(self)
101 return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
103 def have_shares(self):
104 # quick test to decide if we need to commit to an implicit
105 # permutation-seed or if we should use a new one
106 return bool(set(os.listdir(self.sharedir)) - set(["incoming"]))
108 def add_bucket_counter(self):
109 statefile = os.path.join(self.storedir, "bucket_counter.state")
110 self.bucket_counter = BucketCountingCrawler(self, statefile)
111 self.bucket_counter.setServiceParent(self)
113 def count(self, name, delta=1):
114 if self.stats_provider:
115 self.stats_provider.count("storage_server." + name, delta)
117 def add_latency(self, category, latency):
118 a = self.latencies[category]
121 self.latencies[category] = a[-1000:]
123 def get_latencies(self):
124 """Return a dict, indexed by category, that contains a dict of
125 latency numbers for each category. If there are sufficient samples
126 for unambiguous interpretation, each dict will contain the
127 following keys: mean, 01_0_percentile, 10_0_percentile,
128 50_0_percentile (median), 90_0_percentile, 95_0_percentile,
129 99_0_percentile, 99_9_percentile. If there are insufficient
130 samples for a given percentile to be interpreted unambiguously
131 that percentile will be reported as None. If no samples have been
132 collected for the given category, then that category name will
133 not be present in the return value. """
134 # note that Amazon's Dynamo paper says they use 99.9% percentile.
136 for category in self.latencies:
137 if not self.latencies[category]:
140 samples = self.latencies[category][:]
142 stats["samplesize"] = count
145 stats["mean"] = sum(samples) / count
149 orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
150 (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
151 (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
152 (0.999, "99_9_percentile", 1000)]
154 for percentile, percentilestring, minnumtoobserve in orderstatlist:
155 if count >= minnumtoobserve:
156 stats[percentilestring] = samples[int(percentile*count)]
158 stats[percentilestring] = None
160 output[category] = stats
163 def log(self, *args, **kwargs):
164 if "facility" not in kwargs:
165 kwargs["facility"] = "tahoe.storage"
166 return log.msg(*args, **kwargs)
168 def _clean_incomplete(self):
169 fileutil.rm_dir(self.incomingdir)
172 # remember: RIStatsProvider requires that our return dict
173 # contains numeric values.
174 stats = { 'storage_server.allocated': self.allocated_size(), }
175 stats['storage_server.reserved_space'] = self.reserved_space
176 for category,ld in self.get_latencies().items():
177 for name,v in ld.items():
178 stats['storage_server.latencies.%s.%s' % (category, name)] = v
181 disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
182 writeable = disk['avail'] > 0
184 # spacetime predictors should use disk_avail / (d(disk_used)/dt)
185 stats['storage_server.disk_total'] = disk['total']
186 stats['storage_server.disk_used'] = disk['used']
187 stats['storage_server.disk_free_for_root'] = disk['free_for_root']
188 stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
189 stats['storage_server.disk_avail'] = disk['avail']
190 except AttributeError:
192 except EnvironmentError:
193 log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
196 if self.readonly_storage:
197 stats['storage_server.disk_avail'] = 0
200 stats['storage_server.accepting_immutable_shares'] = int(writeable)
201 s = self.bucket_counter.get_state()
202 bucket_count = s.get("last-complete-bucket-count")
204 stats['storage_server.total_bucket_count'] = bucket_count
207 def get_available_space(self):
208 """Returns available space for share storage in bytes, or None if no
209 API to get this information is available."""
211 if self.readonly_storage:
213 return fileutil.get_available_space(self.sharedir, self.reserved_space)
215 def allocated_size(self):
217 for bw in self._active_writers:
218 space += bw.allocated_size()
221 def remote_get_version(self):
222 remaining_space = self.get_available_space()
223 if remaining_space is None:
224 # We're on a platform that has no API to get disk stats.
225 remaining_space = 2**64
227 version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
228 { "maximum-immutable-share-size": remaining_space,
229 "maximum-mutable-share-size": MAX_MUTABLE_SHARE_SIZE,
230 "tolerates-immutable-read-overrun": True,
231 "delete-mutable-shares-with-zero-length-writev": True,
232 "fills-holes-with-zero-bytes": True,
233 "prevents-read-past-end-of-share-data": True,
235 "application-version": str(allmydata.__full_version__),
239 def remote_allocate_buckets(self, storage_index,
240 renew_secret, cancel_secret,
241 sharenums, allocated_size,
242 canary, owner_num=0):
243 # owner_num is not for clients to set, but rather it should be
244 # curried into the PersonalStorageServer instance that is dedicated
245 # to a particular owner.
247 self.count("allocate")
249 bucketwriters = {} # k: shnum, v: BucketWriter
250 si_dir = storage_index_to_dir(storage_index)
251 si_s = si_b2a(storage_index)
253 log.msg("storage: allocate_buckets %s" % si_s)
255 # in this implementation, the lease information (including secrets)
256 # goes into the share files themselves. It could also be put into a
257 # separate database. Note that the lease should not be added until
258 # the BucketWriter has been closed.
259 expire_time = time.time() + 31*24*60*60
260 lease_info = LeaseInfo(owner_num,
261 renew_secret, cancel_secret,
262 expire_time, self.my_nodeid)
264 max_space_per_bucket = allocated_size
266 remaining_space = self.get_available_space()
267 limited = remaining_space is not None
269 # this is a bit conservative, since some of this allocated_size()
270 # has already been written to disk, where it will show up in
271 # get_available_space.
272 remaining_space -= self.allocated_size()
273 # self.readonly_storage causes remaining_space <= 0
275 # fill alreadygot with all shares that we have, not just the ones
276 # they asked about: this will save them a lot of work. Add or update
277 # leases for all of them: if they want us to hold shares for this
278 # file, they'll want us to hold leases for this file.
279 for (shnum, fn) in self._get_bucket_shares(storage_index):
280 alreadygot.add(shnum)
282 sf.add_or_renew_lease(lease_info)
284 for shnum in sharenums:
285 incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
286 finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
287 if os.path.exists(finalhome):
288 # great! we already have it. easy.
290 elif os.path.exists(incominghome):
291 # Note that we don't create BucketWriters for shnums that
292 # have a partial share (in incoming/), so if a second upload
293 # occurs while the first is still in progress, the second
294 # uploader will use different storage servers.
296 elif (not limited) or (remaining_space >= max_space_per_bucket):
297 # ok! we need to create the new share file.
298 bw = BucketWriter(self, incominghome, finalhome,
299 max_space_per_bucket, lease_info, canary)
301 bw.throw_out_all_data = True
302 bucketwriters[shnum] = bw
303 self._active_writers[bw] = 1
305 remaining_space -= max_space_per_bucket
307 # bummer! not enough space to accept this bucket
311 fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
313 self.add_latency("allocate", time.time() - start)
314 return alreadygot, bucketwriters
316 def _iter_share_files(self, storage_index):
317 for shnum, filename in self._get_bucket_shares(storage_index):
318 f = open(filename, 'rb')
321 if header[:32] == MutableShareFile.MAGIC:
322 sf = MutableShareFile(filename, self)
323 # note: if the share has been migrated, the renew_lease()
324 # call will throw an exception, with information to help the
325 # client update the lease.
326 elif header[:4] == struct.pack(">L", 1):
327 sf = ShareFile(filename)
329 continue # non-sharefile
332 def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
335 self.count("add-lease")
336 new_expire_time = time.time() + 31*24*60*60
337 lease_info = LeaseInfo(owner_num,
338 renew_secret, cancel_secret,
339 new_expire_time, self.my_nodeid)
340 for sf in self._iter_share_files(storage_index):
341 sf.add_or_renew_lease(lease_info)
342 self.add_latency("add-lease", time.time() - start)
345 def remote_renew_lease(self, storage_index, renew_secret):
348 new_expire_time = time.time() + 31*24*60*60
349 found_buckets = False
350 for sf in self._iter_share_files(storage_index):
352 sf.renew_lease(renew_secret, new_expire_time)
353 self.add_latency("renew", time.time() - start)
354 if not found_buckets:
355 raise IndexError("no such lease to renew")
357 def bucket_writer_closed(self, bw, consumed_size):
358 if self.stats_provider:
359 self.stats_provider.count('storage_server.bytes_added', consumed_size)
360 del self._active_writers[bw]
362 def _get_bucket_shares(self, storage_index):
363 """Return a list of (shnum, pathname) tuples for files that hold
364 shares for this storage_index. In each tuple, 'shnum' will always be
365 the integer form of the last component of 'pathname'."""
366 storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
368 for f in os.listdir(storagedir):
370 filename = os.path.join(storagedir, f)
371 yield (int(f), filename)
373 # Commonly caused by there being no buckets at all.
376 def remote_get_buckets(self, storage_index):
379 si_s = si_b2a(storage_index)
380 log.msg("storage: get_buckets %s" % si_s)
381 bucketreaders = {} # k: sharenum, v: BucketReader
382 for shnum, filename in self._get_bucket_shares(storage_index):
383 bucketreaders[shnum] = BucketReader(self, filename,
384 storage_index, shnum)
385 self.add_latency("get", time.time() - start)
388 def get_leases(self, storage_index):
389 """Provide an iterator that yields all of the leases attached to this
390 bucket. Each lease is returned as a LeaseInfo instance.
392 This method is not for client use.
395 # since all shares get the same lease data, we just grab the leases
396 # from the first share
398 shnum, filename = self._get_bucket_shares(storage_index).next()
399 sf = ShareFile(filename)
400 return sf.get_leases()
401 except StopIteration:
404 def remote_slot_testv_and_readv_and_writev(self, storage_index,
406 test_and_write_vectors,
410 si_s = si_b2a(storage_index)
411 log.msg("storage: slot_writev %s" % si_s)
412 si_dir = storage_index_to_dir(storage_index)
413 (write_enabler, renew_secret, cancel_secret) = secrets
414 # shares exist if there is a file for them
415 bucketdir = os.path.join(self.sharedir, si_dir)
417 if os.path.isdir(bucketdir):
418 for sharenum_s in os.listdir(bucketdir):
420 sharenum = int(sharenum_s)
423 filename = os.path.join(bucketdir, sharenum_s)
424 msf = MutableShareFile(filename, self)
425 msf.check_write_enabler(write_enabler, si_s)
426 shares[sharenum] = msf
427 # write_enabler is good for all existing shares.
429 # Now evaluate test vectors.
431 for sharenum in test_and_write_vectors:
432 (testv, datav, new_length) = test_and_write_vectors[sharenum]
433 if sharenum in shares:
434 if not shares[sharenum].check_testv(testv):
435 self.log("testv failed: [%d]: %r" % (sharenum, testv))
436 testv_is_good = False
439 # compare the vectors against an empty share, in which all
440 # reads return empty strings.
441 if not EmptyShare().check_testv(testv):
442 self.log("testv failed (empty): [%d] %r" % (sharenum,
444 testv_is_good = False
447 # now gather the read vectors, before we do any writes
449 for sharenum, share in shares.items():
450 read_data[sharenum] = share.readv(read_vector)
453 expire_time = time.time() + 31*24*60*60 # one month
454 lease_info = LeaseInfo(ownerid,
455 renew_secret, cancel_secret,
456 expire_time, self.my_nodeid)
459 # now apply the write vectors
460 for sharenum in test_and_write_vectors:
461 (testv, datav, new_length) = test_and_write_vectors[sharenum]
463 if sharenum in shares:
464 shares[sharenum].unlink()
466 if sharenum not in shares:
467 # allocate a new share
468 allocated_size = 2000 # arbitrary, really
469 share = self._allocate_slot_share(bucketdir, secrets,
473 shares[sharenum] = share
474 shares[sharenum].writev(datav, new_length)
475 # and update the lease
476 shares[sharenum].add_or_renew_lease(lease_info)
479 # delete empty bucket directories
480 if not os.listdir(bucketdir):
485 self.add_latency("writev", time.time() - start)
486 return (testv_is_good, read_data)
488 def _allocate_slot_share(self, bucketdir, secrets, sharenum,
489 allocated_size, owner_num=0):
490 (write_enabler, renew_secret, cancel_secret) = secrets
491 my_nodeid = self.my_nodeid
492 fileutil.make_dirs(bucketdir)
493 filename = os.path.join(bucketdir, "%d" % sharenum)
494 share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
498 def remote_slot_readv(self, storage_index, shares, readv):
501 si_s = si_b2a(storage_index)
502 lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
503 facility="tahoe.storage", level=log.OPERATIONAL)
504 si_dir = storage_index_to_dir(storage_index)
505 # shares exist if there is a file for them
506 bucketdir = os.path.join(self.sharedir, si_dir)
507 if not os.path.isdir(bucketdir):
508 self.add_latency("readv", time.time() - start)
511 for sharenum_s in os.listdir(bucketdir):
513 sharenum = int(sharenum_s)
516 if sharenum in shares or not shares:
517 filename = os.path.join(bucketdir, sharenum_s)
518 msf = MutableShareFile(filename, self)
519 datavs[sharenum] = msf.readv(readv)
520 log.msg("returning shares %s" % (datavs.keys(),),
521 facility="tahoe.storage", level=log.NOISY, parent=lp)
522 self.add_latency("readv", time.time() - start)
525 def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
527 fileutil.make_dirs(self.corruption_advisory_dir)
528 now = time_format.iso_utc(sep="T")
529 si_s = si_b2a(storage_index)
530 # windows can't handle colons in the filename
531 fn = os.path.join(self.corruption_advisory_dir,
532 "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
534 f.write("report: Share Corruption\n")
535 f.write("type: %s\n" % share_type)
536 f.write("storage_index: %s\n" % si_s)
537 f.write("share_number: %d\n" % shnum)
542 log.msg(format=("client claims corruption in (%(share_type)s) " +
543 "%(si)s-%(shnum)d: %(reason)s"),
544 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
545 level=log.SCARY, umid="SGx2fA")