1 import os, re, weakref, struct, time
3 from foolscap.api import Referenceable
4 from twisted.application import service
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15 create_mutable_sharefile
16 from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
17 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
22 # storage/shares/incoming
23 # incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
24 # be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
25 # storage/shares/$START/$STORAGEINDEX
26 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
28 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
31 # $SHARENUM matches this regex:
32 NUM_RE=re.compile("^[0-9]+$")
36 class StorageServer(service.MultiService, Referenceable):
37 implements(RIStorageServer, IStatsProducer)
39 LeaseCheckerClass = LeaseCheckingCrawler
41 def __init__(self, storedir, nodeid, reserved_space=0,
42 discard_storage=False, readonly_storage=False,
44 expiration_enabled=False,
45 expiration_mode="age",
46 expiration_override_lease_duration=None,
47 expiration_cutoff_date=None,
48 expiration_sharetypes=("mutable", "immutable")):
49 service.MultiService.__init__(self)
50 assert isinstance(nodeid, str)
51 assert len(nodeid) == 20
52 self.my_nodeid = nodeid
53 self.storedir = storedir
54 sharedir = os.path.join(storedir, "shares")
55 fileutil.make_dirs(sharedir)
56 self.sharedir = sharedir
57 # we don't actually create the corruption-advisory dir until necessary
58 self.corruption_advisory_dir = os.path.join(storedir,
59 "corruption-advisories")
60 self.reserved_space = int(reserved_space)
61 self.no_storage = discard_storage
62 self.readonly_storage = readonly_storage
63 self.stats_provider = stats_provider
64 if self.stats_provider:
65 self.stats_provider.register_producer(self)
66 self.incomingdir = os.path.join(sharedir, 'incoming')
67 self._clean_incomplete()
68 fileutil.make_dirs(self.incomingdir)
69 self._active_writers = weakref.WeakKeyDictionary()
70 log.msg("StorageServer created", facility="tahoe.storage")
73 if self.get_available_space() is None:
74 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
75 umin="0wZ27w", level=log.UNUSUAL)
77 self.latencies = {"allocate": [], # immutable
82 "writev": [], # mutable
84 "add-lease": [], # both
88 self.add_bucket_counter()
90 statefile = os.path.join(self.storedir, "lease_checker.state")
91 historyfile = os.path.join(self.storedir, "lease_checker.history")
92 klass = self.LeaseCheckerClass
93 self.lease_checker = klass(self, statefile, historyfile,
94 expiration_enabled, expiration_mode,
95 expiration_override_lease_duration,
96 expiration_cutoff_date,
97 expiration_sharetypes)
98 self.lease_checker.setServiceParent(self)
101 return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
103 def have_shares(self):
104 # quick test to decide if we need to commit to an implicit
105 # permutation-seed or if we should use a new one
106 return bool(set(os.listdir(self.sharedir)) - set(["incoming"]))
108 def add_bucket_counter(self):
109 statefile = os.path.join(self.storedir, "bucket_counter.state")
110 self.bucket_counter = BucketCountingCrawler(self, statefile)
111 self.bucket_counter.setServiceParent(self)
113 def count(self, name, delta=1):
114 if self.stats_provider:
115 self.stats_provider.count("storage_server." + name, delta)
117 def add_latency(self, category, latency):
118 a = self.latencies[category]
121 self.latencies[category] = a[-1000:]
123 def get_latencies(self):
124 """Return a dict, indexed by category, that contains a dict of
125 latency numbers for each category. If there are sufficient samples
126 for unambiguous interpretation, each dict will contain the
127 following keys: mean, 01_0_percentile, 10_0_percentile,
128 50_0_percentile (median), 90_0_percentile, 95_0_percentile,
129 99_0_percentile, 99_9_percentile. If there are insufficient
130 samples for a given percentile to be interpreted unambiguously
131 that percentile will be reported as None. If no samples have been
132 collected for the given category, then that category name will
133 not be present in the return value. """
134 # note that Amazon's Dynamo paper says they use 99.9% percentile.
136 for category in self.latencies:
137 if not self.latencies[category]:
140 samples = self.latencies[category][:]
142 stats["samplesize"] = count
145 stats["mean"] = sum(samples) / count
149 orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
150 (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
151 (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
152 (0.999, "99_9_percentile", 1000)]
154 for percentile, percentilestring, minnumtoobserve in orderstatlist:
155 if count >= minnumtoobserve:
156 stats[percentilestring] = samples[int(percentile*count)]
158 stats[percentilestring] = None
160 output[category] = stats
163 def log(self, *args, **kwargs):
164 if "facility" not in kwargs:
165 kwargs["facility"] = "tahoe.storage"
166 return log.msg(*args, **kwargs)
168 def _clean_incomplete(self):
169 fileutil.rm_dir(self.incomingdir)
172 # remember: RIStatsProvider requires that our return dict
173 # contains numeric values.
174 stats = { 'storage_server.allocated': self.allocated_size(), }
175 stats['storage_server.reserved_space'] = self.reserved_space
176 for category,ld in self.get_latencies().items():
177 for name,v in ld.items():
178 stats['storage_server.latencies.%s.%s' % (category, name)] = v
181 disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
182 writeable = disk['avail'] > 0
184 # spacetime predictors should use disk_avail / (d(disk_used)/dt)
185 stats['storage_server.disk_total'] = disk['total']
186 stats['storage_server.disk_used'] = disk['used']
187 stats['storage_server.disk_free_for_root'] = disk['free_for_root']
188 stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
189 stats['storage_server.disk_avail'] = disk['avail']
190 except AttributeError:
192 except EnvironmentError:
193 log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
196 if self.readonly_storage:
197 stats['storage_server.disk_avail'] = 0
200 stats['storage_server.accepting_immutable_shares'] = int(writeable)
201 s = self.bucket_counter.get_state()
202 bucket_count = s.get("last-complete-bucket-count")
204 stats['storage_server.total_bucket_count'] = bucket_count
207 def get_available_space(self):
208 """Returns available space for share storage in bytes, or None if no
209 API to get this information is available."""
211 if self.readonly_storage:
213 return fileutil.get_available_space(self.sharedir, self.reserved_space)
215 def allocated_size(self):
217 for bw in self._active_writers:
218 space += bw.allocated_size()
221 def remote_get_version(self):
222 remaining_space = self.get_available_space()
223 if remaining_space is None:
224 # We're on a platform that has no API to get disk stats.
225 remaining_space = 2**64
227 version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
228 { "maximum-immutable-share-size": remaining_space,
229 "maximum-mutable-share-size": MAX_MUTABLE_SHARE_SIZE,
230 "available-space": remaining_space,
231 "tolerates-immutable-read-overrun": True,
232 "delete-mutable-shares-with-zero-length-writev": True,
233 "fills-holes-with-zero-bytes": True,
234 "prevents-read-past-end-of-share-data": True,
236 "application-version": str(allmydata.__full_version__),
240 def remote_allocate_buckets(self, storage_index,
241 renew_secret, cancel_secret,
242 sharenums, allocated_size,
243 canary, owner_num=0):
244 # owner_num is not for clients to set, but rather it should be
245 # curried into the PersonalStorageServer instance that is dedicated
246 # to a particular owner.
248 self.count("allocate")
250 bucketwriters = {} # k: shnum, v: BucketWriter
251 si_dir = storage_index_to_dir(storage_index)
252 si_s = si_b2a(storage_index)
254 log.msg("storage: allocate_buckets %s" % si_s)
256 # in this implementation, the lease information (including secrets)
257 # goes into the share files themselves. It could also be put into a
258 # separate database. Note that the lease should not be added until
259 # the BucketWriter has been closed.
260 expire_time = time.time() + 31*24*60*60
261 lease_info = LeaseInfo(owner_num,
262 renew_secret, cancel_secret,
263 expire_time, self.my_nodeid)
265 max_space_per_bucket = allocated_size
267 remaining_space = self.get_available_space()
268 limited = remaining_space is not None
270 # this is a bit conservative, since some of this allocated_size()
271 # has already been written to disk, where it will show up in
272 # get_available_space.
273 remaining_space -= self.allocated_size()
274 # self.readonly_storage causes remaining_space <= 0
276 # fill alreadygot with all shares that we have, not just the ones
277 # they asked about: this will save them a lot of work. Add or update
278 # leases for all of them: if they want us to hold shares for this
279 # file, they'll want us to hold leases for this file.
280 for (shnum, fn) in self._get_bucket_shares(storage_index):
281 alreadygot.add(shnum)
283 sf.add_or_renew_lease(lease_info)
285 for shnum in sharenums:
286 incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
287 finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
288 if os.path.exists(finalhome):
289 # great! we already have it. easy.
291 elif os.path.exists(incominghome):
292 # Note that we don't create BucketWriters for shnums that
293 # have a partial share (in incoming/), so if a second upload
294 # occurs while the first is still in progress, the second
295 # uploader will use different storage servers.
297 elif (not limited) or (remaining_space >= max_space_per_bucket):
298 # ok! we need to create the new share file.
299 bw = BucketWriter(self, incominghome, finalhome,
300 max_space_per_bucket, lease_info, canary)
302 bw.throw_out_all_data = True
303 bucketwriters[shnum] = bw
304 self._active_writers[bw] = 1
306 remaining_space -= max_space_per_bucket
308 # bummer! not enough space to accept this bucket
312 fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
314 self.add_latency("allocate", time.time() - start)
315 return alreadygot, bucketwriters
317 def _iter_share_files(self, storage_index):
318 for shnum, filename in self._get_bucket_shares(storage_index):
319 f = open(filename, 'rb')
322 if header[:32] == MutableShareFile.MAGIC:
323 sf = MutableShareFile(filename, self)
324 # note: if the share has been migrated, the renew_lease()
325 # call will throw an exception, with information to help the
326 # client update the lease.
327 elif header[:4] == struct.pack(">L", 1):
328 sf = ShareFile(filename)
330 continue # non-sharefile
333 def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
336 self.count("add-lease")
337 new_expire_time = time.time() + 31*24*60*60
338 lease_info = LeaseInfo(owner_num,
339 renew_secret, cancel_secret,
340 new_expire_time, self.my_nodeid)
341 for sf in self._iter_share_files(storage_index):
342 sf.add_or_renew_lease(lease_info)
343 self.add_latency("add-lease", time.time() - start)
346 def remote_renew_lease(self, storage_index, renew_secret):
349 new_expire_time = time.time() + 31*24*60*60
350 found_buckets = False
351 for sf in self._iter_share_files(storage_index):
353 sf.renew_lease(renew_secret, new_expire_time)
354 self.add_latency("renew", time.time() - start)
355 if not found_buckets:
356 raise IndexError("no such lease to renew")
358 def bucket_writer_closed(self, bw, consumed_size):
359 if self.stats_provider:
360 self.stats_provider.count('storage_server.bytes_added', consumed_size)
361 del self._active_writers[bw]
363 def _get_bucket_shares(self, storage_index):
364 """Return a list of (shnum, pathname) tuples for files that hold
365 shares for this storage_index. In each tuple, 'shnum' will always be
366 the integer form of the last component of 'pathname'."""
367 storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
369 for f in os.listdir(storagedir):
371 filename = os.path.join(storagedir, f)
372 yield (int(f), filename)
374 # Commonly caused by there being no buckets at all.
377 def remote_get_buckets(self, storage_index):
380 si_s = si_b2a(storage_index)
381 log.msg("storage: get_buckets %s" % si_s)
382 bucketreaders = {} # k: sharenum, v: BucketReader
383 for shnum, filename in self._get_bucket_shares(storage_index):
384 bucketreaders[shnum] = BucketReader(self, filename,
385 storage_index, shnum)
386 self.add_latency("get", time.time() - start)
389 def get_leases(self, storage_index):
390 """Provide an iterator that yields all of the leases attached to this
391 bucket. Each lease is returned as a LeaseInfo instance.
393 This method is not for client use.
396 # since all shares get the same lease data, we just grab the leases
397 # from the first share
399 shnum, filename = self._get_bucket_shares(storage_index).next()
400 sf = ShareFile(filename)
401 return sf.get_leases()
402 except StopIteration:
405 def remote_slot_testv_and_readv_and_writev(self, storage_index,
407 test_and_write_vectors,
411 si_s = si_b2a(storage_index)
412 log.msg("storage: slot_writev %s" % si_s)
413 si_dir = storage_index_to_dir(storage_index)
414 (write_enabler, renew_secret, cancel_secret) = secrets
415 # shares exist if there is a file for them
416 bucketdir = os.path.join(self.sharedir, si_dir)
418 if os.path.isdir(bucketdir):
419 for sharenum_s in os.listdir(bucketdir):
421 sharenum = int(sharenum_s)
424 filename = os.path.join(bucketdir, sharenum_s)
425 msf = MutableShareFile(filename, self)
426 msf.check_write_enabler(write_enabler, si_s)
427 shares[sharenum] = msf
428 # write_enabler is good for all existing shares.
430 # Now evaluate test vectors.
432 for sharenum in test_and_write_vectors:
433 (testv, datav, new_length) = test_and_write_vectors[sharenum]
434 if sharenum in shares:
435 if not shares[sharenum].check_testv(testv):
436 self.log("testv failed: [%d]: %r" % (sharenum, testv))
437 testv_is_good = False
440 # compare the vectors against an empty share, in which all
441 # reads return empty strings.
442 if not EmptyShare().check_testv(testv):
443 self.log("testv failed (empty): [%d] %r" % (sharenum,
445 testv_is_good = False
448 # now gather the read vectors, before we do any writes
450 for sharenum, share in shares.items():
451 read_data[sharenum] = share.readv(read_vector)
454 expire_time = time.time() + 31*24*60*60 # one month
455 lease_info = LeaseInfo(ownerid,
456 renew_secret, cancel_secret,
457 expire_time, self.my_nodeid)
460 # now apply the write vectors
461 for sharenum in test_and_write_vectors:
462 (testv, datav, new_length) = test_and_write_vectors[sharenum]
464 if sharenum in shares:
465 shares[sharenum].unlink()
467 if sharenum not in shares:
468 # allocate a new share
469 allocated_size = 2000 # arbitrary, really
470 share = self._allocate_slot_share(bucketdir, secrets,
474 shares[sharenum] = share
475 shares[sharenum].writev(datav, new_length)
476 # and update the lease
477 shares[sharenum].add_or_renew_lease(lease_info)
480 # delete empty bucket directories
481 if not os.listdir(bucketdir):
486 self.add_latency("writev", time.time() - start)
487 return (testv_is_good, read_data)
489 def _allocate_slot_share(self, bucketdir, secrets, sharenum,
490 allocated_size, owner_num=0):
491 (write_enabler, renew_secret, cancel_secret) = secrets
492 my_nodeid = self.my_nodeid
493 fileutil.make_dirs(bucketdir)
494 filename = os.path.join(bucketdir, "%d" % sharenum)
495 share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
499 def remote_slot_readv(self, storage_index, shares, readv):
502 si_s = si_b2a(storage_index)
503 lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
504 facility="tahoe.storage", level=log.OPERATIONAL)
505 si_dir = storage_index_to_dir(storage_index)
506 # shares exist if there is a file for them
507 bucketdir = os.path.join(self.sharedir, si_dir)
508 if not os.path.isdir(bucketdir):
509 self.add_latency("readv", time.time() - start)
512 for sharenum_s in os.listdir(bucketdir):
514 sharenum = int(sharenum_s)
517 if sharenum in shares or not shares:
518 filename = os.path.join(bucketdir, sharenum_s)
519 msf = MutableShareFile(filename, self)
520 datavs[sharenum] = msf.readv(readv)
521 log.msg("returning shares %s" % (datavs.keys(),),
522 facility="tahoe.storage", level=log.NOISY, parent=lp)
523 self.add_latency("readv", time.time() - start)
526 def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
528 fileutil.make_dirs(self.corruption_advisory_dir)
529 now = time_format.iso_utc(sep="T")
530 si_s = si_b2a(storage_index)
531 # windows can't handle colons in the filename
532 fn = os.path.join(self.corruption_advisory_dir,
533 "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
535 f.write("report: Share Corruption\n")
536 f.write("type: %s\n" % share_type)
537 f.write("storage_index: %s\n" % si_s)
538 f.write("share_number: %d\n" % shnum)
543 log.msg(format=("client claims corruption in (%(share_type)s) " +
544 "%(si)s-%(shnum)d: %(reason)s"),
545 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
546 level=log.SCARY, umid="SGx2fA")