1 import os, re, weakref, struct, time
3 from foolscap.api import Referenceable
4 from twisted.application import service
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15 create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18 from allmydata.storage.expirer import LeaseCheckingCrawler
21 # storage/shares/incoming
22 # incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23 # be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24 # storage/shares/$START/$STORAGEINDEX
25 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
27 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
30 # $SHARENUM matches this regex:
31 NUM_RE=re.compile("^[0-9]+$")
35 class StorageServer(service.MultiService, Referenceable):
36 implements(RIStorageServer, IStatsProducer)
38 LeaseCheckerClass = LeaseCheckingCrawler
40 def __init__(self, storedir, nodeid, reserved_space=0,
41 discard_storage=False, readonly_storage=False,
43 expiration_enabled=False,
44 expiration_mode="age",
45 expiration_override_lease_duration=None,
46 expiration_cutoff_date=None,
47 expiration_sharetypes=("mutable", "immutable")):
48 service.MultiService.__init__(self)
49 assert isinstance(nodeid, str)
50 assert len(nodeid) == 20
51 self.my_nodeid = nodeid
52 self.storedir = storedir
53 sharedir = os.path.join(storedir, "shares")
54 fileutil.make_dirs(sharedir)
55 self.sharedir = sharedir
56 # we don't actually create the corruption-advisory dir until necessary
57 self.corruption_advisory_dir = os.path.join(storedir,
58 "corruption-advisories")
59 self.reserved_space = int(reserved_space)
60 self.no_storage = discard_storage
61 self.readonly_storage = readonly_storage
62 self.stats_provider = stats_provider
63 if self.stats_provider:
64 self.stats_provider.register_producer(self)
65 self.incomingdir = os.path.join(sharedir, 'incoming')
66 self._clean_incomplete()
67 fileutil.make_dirs(self.incomingdir)
68 self._active_writers = weakref.WeakKeyDictionary()
69 log.msg("StorageServer created", facility="tahoe.storage")
72 if self.get_available_space() is None:
73 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
74 umin="0wZ27w", level=log.UNUSUAL)
76 self.latencies = {"allocate": [], # immutable
81 "writev": [], # mutable
83 "add-lease": [], # both
87 self.add_bucket_counter()
89 statefile = os.path.join(self.storedir, "lease_checker.state")
90 historyfile = os.path.join(self.storedir, "lease_checker.history")
91 klass = self.LeaseCheckerClass
92 self.lease_checker = klass(self, statefile, historyfile,
93 expiration_enabled, expiration_mode,
94 expiration_override_lease_duration,
95 expiration_cutoff_date,
96 expiration_sharetypes)
97 self.lease_checker.setServiceParent(self)
100 return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
102 def have_shares(self):
103 # quick test to decide if we need to commit to an implicit
104 # permutation-seed or if we should use a new one
105 return bool(set(os.listdir(self.sharedir)) - set(["incoming"]))
107 def add_bucket_counter(self):
108 statefile = os.path.join(self.storedir, "bucket_counter.state")
109 self.bucket_counter = BucketCountingCrawler(self, statefile)
110 self.bucket_counter.setServiceParent(self)
112 def count(self, name, delta=1):
113 if self.stats_provider:
114 self.stats_provider.count("storage_server." + name, delta)
116 def add_latency(self, category, latency):
117 a = self.latencies[category]
120 self.latencies[category] = a[-1000:]
122 def get_latencies(self):
123 """Return a dict, indexed by category, that contains a dict of
124 latency numbers for each category. If there are sufficient samples
125 for unambiguous interpretation, each dict will contain the
126 following keys: mean, 01_0_percentile, 10_0_percentile,
127 50_0_percentile (median), 90_0_percentile, 95_0_percentile,
128 99_0_percentile, 99_9_percentile. If there are insufficient
129 samples for a given percentile to be interpreted unambiguously
130 that percentile will be reported as None. If no samples have been
131 collected for the given category, then that category name will
132 not be present in the return value. """
133 # note that Amazon's Dynamo paper says they use 99.9% percentile.
135 for category in self.latencies:
136 if not self.latencies[category]:
139 samples = self.latencies[category][:]
141 stats["samplesize"] = count
144 stats["mean"] = sum(samples) / count
148 orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
149 (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
150 (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
151 (0.999, "99_9_percentile", 1000)]
153 for percentile, percentilestring, minnumtoobserve in orderstatlist:
154 if count >= minnumtoobserve:
155 stats[percentilestring] = samples[int(percentile*count)]
157 stats[percentilestring] = None
159 output[category] = stats
162 def log(self, *args, **kwargs):
163 if "facility" not in kwargs:
164 kwargs["facility"] = "tahoe.storage"
165 return log.msg(*args, **kwargs)
167 def _clean_incomplete(self):
168 fileutil.rm_dir(self.incomingdir)
171 # remember: RIStatsProvider requires that our return dict
172 # contains numeric values.
173 stats = { 'storage_server.allocated': self.allocated_size(), }
174 stats['storage_server.reserved_space'] = self.reserved_space
175 for category,ld in self.get_latencies().items():
176 for name,v in ld.items():
177 stats['storage_server.latencies.%s.%s' % (category, name)] = v
180 disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
181 writeable = disk['avail'] > 0
183 # spacetime predictors should use disk_avail / (d(disk_used)/dt)
184 stats['storage_server.disk_total'] = disk['total']
185 stats['storage_server.disk_used'] = disk['used']
186 stats['storage_server.disk_free_for_root'] = disk['free_for_root']
187 stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
188 stats['storage_server.disk_avail'] = disk['avail']
189 except AttributeError:
191 except EnvironmentError:
192 log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
195 if self.readonly_storage:
196 stats['storage_server.disk_avail'] = 0
199 stats['storage_server.accepting_immutable_shares'] = int(writeable)
200 s = self.bucket_counter.get_state()
201 bucket_count = s.get("last-complete-bucket-count")
203 stats['storage_server.total_bucket_count'] = bucket_count
206 def get_available_space(self):
207 """Returns available space for share storage in bytes, or None if no
208 API to get this information is available."""
210 if self.readonly_storage:
212 return fileutil.get_available_space(self.sharedir, self.reserved_space)
214 def allocated_size(self):
216 for bw in self._active_writers:
217 space += bw.allocated_size()
220 def remote_get_version(self):
221 remaining_space = self.get_available_space()
222 if remaining_space is None:
223 # We're on a platform that has no API to get disk stats.
224 remaining_space = 2**64
226 version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
227 { "maximum-immutable-share-size": remaining_space,
228 "tolerates-immutable-read-overrun": True,
229 "delete-mutable-shares-with-zero-length-writev": True,
230 "fills-holes-with-zero-bytes": True,
231 "prevents-read-past-end-of-share-data": True,
233 "application-version": str(allmydata.__full_version__),
237 def remote_allocate_buckets(self, storage_index,
238 renew_secret, cancel_secret,
239 sharenums, allocated_size,
240 canary, owner_num=0):
241 # owner_num is not for clients to set, but rather it should be
242 # curried into the PersonalStorageServer instance that is dedicated
243 # to a particular owner.
245 self.count("allocate")
247 bucketwriters = {} # k: shnum, v: BucketWriter
248 si_dir = storage_index_to_dir(storage_index)
249 si_s = si_b2a(storage_index)
251 log.msg("storage: allocate_buckets %s" % si_s)
253 # in this implementation, the lease information (including secrets)
254 # goes into the share files themselves. It could also be put into a
255 # separate database. Note that the lease should not be added until
256 # the BucketWriter has been closed.
257 expire_time = time.time() + 31*24*60*60
258 lease_info = LeaseInfo(owner_num,
259 renew_secret, cancel_secret,
260 expire_time, self.my_nodeid)
262 max_space_per_bucket = allocated_size
264 remaining_space = self.get_available_space()
265 limited = remaining_space is not None
267 # this is a bit conservative, since some of this allocated_size()
268 # has already been written to disk, where it will show up in
269 # get_available_space.
270 remaining_space -= self.allocated_size()
271 # self.readonly_storage causes remaining_space <= 0
273 # fill alreadygot with all shares that we have, not just the ones
274 # they asked about: this will save them a lot of work. Add or update
275 # leases for all of them: if they want us to hold shares for this
276 # file, they'll want us to hold leases for this file.
277 for (shnum, fn) in self._get_bucket_shares(storage_index):
278 alreadygot.add(shnum)
280 sf.add_or_renew_lease(lease_info)
282 for shnum in sharenums:
283 incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
284 finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
285 if os.path.exists(finalhome):
286 # great! we already have it. easy.
288 elif os.path.exists(incominghome):
289 # Note that we don't create BucketWriters for shnums that
290 # have a partial share (in incoming/), so if a second upload
291 # occurs while the first is still in progress, the second
292 # uploader will use different storage servers.
294 elif (not limited) or (remaining_space >= max_space_per_bucket):
295 # ok! we need to create the new share file.
296 bw = BucketWriter(self, incominghome, finalhome,
297 max_space_per_bucket, lease_info, canary)
299 bw.throw_out_all_data = True
300 bucketwriters[shnum] = bw
301 self._active_writers[bw] = 1
303 remaining_space -= max_space_per_bucket
305 # bummer! not enough space to accept this bucket
309 fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
311 self.add_latency("allocate", time.time() - start)
312 return alreadygot, bucketwriters
314 def _iter_share_files(self, storage_index):
315 for shnum, filename in self._get_bucket_shares(storage_index):
316 f = open(filename, 'rb')
319 if header[:32] == MutableShareFile.MAGIC:
320 sf = MutableShareFile(filename, self)
321 # note: if the share has been migrated, the renew_lease()
322 # call will throw an exception, with information to help the
323 # client update the lease.
324 elif header[:4] == struct.pack(">L", 1):
325 sf = ShareFile(filename)
327 continue # non-sharefile
330 def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
333 self.count("add-lease")
334 new_expire_time = time.time() + 31*24*60*60
335 lease_info = LeaseInfo(owner_num,
336 renew_secret, cancel_secret,
337 new_expire_time, self.my_nodeid)
338 for sf in self._iter_share_files(storage_index):
339 sf.add_or_renew_lease(lease_info)
340 self.add_latency("add-lease", time.time() - start)
343 def remote_renew_lease(self, storage_index, renew_secret):
346 new_expire_time = time.time() + 31*24*60*60
347 found_buckets = False
348 for sf in self._iter_share_files(storage_index):
350 sf.renew_lease(renew_secret, new_expire_time)
351 self.add_latency("renew", time.time() - start)
352 if not found_buckets:
353 raise IndexError("no such lease to renew")
355 def bucket_writer_closed(self, bw, consumed_size):
356 if self.stats_provider:
357 self.stats_provider.count('storage_server.bytes_added', consumed_size)
358 del self._active_writers[bw]
360 def _get_bucket_shares(self, storage_index):
361 """Return a list of (shnum, pathname) tuples for files that hold
362 shares for this storage_index. In each tuple, 'shnum' will always be
363 the integer form of the last component of 'pathname'."""
364 storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
366 for f in os.listdir(storagedir):
368 filename = os.path.join(storagedir, f)
369 yield (int(f), filename)
371 # Commonly caused by there being no buckets at all.
374 def remote_get_buckets(self, storage_index):
377 si_s = si_b2a(storage_index)
378 log.msg("storage: get_buckets %s" % si_s)
379 bucketreaders = {} # k: sharenum, v: BucketReader
380 for shnum, filename in self._get_bucket_shares(storage_index):
381 bucketreaders[shnum] = BucketReader(self, filename,
382 storage_index, shnum)
383 self.add_latency("get", time.time() - start)
386 def get_leases(self, storage_index):
387 """Provide an iterator that yields all of the leases attached to this
388 bucket. Each lease is returned as a LeaseInfo instance.
390 This method is not for client use.
393 # since all shares get the same lease data, we just grab the leases
394 # from the first share
396 shnum, filename = self._get_bucket_shares(storage_index).next()
397 sf = ShareFile(filename)
398 return sf.get_leases()
399 except StopIteration:
402 def remote_slot_testv_and_readv_and_writev(self, storage_index,
404 test_and_write_vectors,
408 si_s = si_b2a(storage_index)
409 log.msg("storage: slot_writev %s" % si_s)
410 si_dir = storage_index_to_dir(storage_index)
411 (write_enabler, renew_secret, cancel_secret) = secrets
412 # shares exist if there is a file for them
413 bucketdir = os.path.join(self.sharedir, si_dir)
415 if os.path.isdir(bucketdir):
416 for sharenum_s in os.listdir(bucketdir):
418 sharenum = int(sharenum_s)
421 filename = os.path.join(bucketdir, sharenum_s)
422 msf = MutableShareFile(filename, self)
423 msf.check_write_enabler(write_enabler, si_s)
424 shares[sharenum] = msf
425 # write_enabler is good for all existing shares.
427 # Now evaluate test vectors.
429 for sharenum in test_and_write_vectors:
430 (testv, datav, new_length) = test_and_write_vectors[sharenum]
431 if sharenum in shares:
432 if not shares[sharenum].check_testv(testv):
433 self.log("testv failed: [%d]: %r" % (sharenum, testv))
434 testv_is_good = False
437 # compare the vectors against an empty share, in which all
438 # reads return empty strings.
439 if not EmptyShare().check_testv(testv):
440 self.log("testv failed (empty): [%d] %r" % (sharenum,
442 testv_is_good = False
445 # now gather the read vectors, before we do any writes
447 for sharenum, share in shares.items():
448 read_data[sharenum] = share.readv(read_vector)
451 expire_time = time.time() + 31*24*60*60 # one month
452 lease_info = LeaseInfo(ownerid,
453 renew_secret, cancel_secret,
454 expire_time, self.my_nodeid)
457 # now apply the write vectors
458 for sharenum in test_and_write_vectors:
459 (testv, datav, new_length) = test_and_write_vectors[sharenum]
461 if sharenum in shares:
462 shares[sharenum].unlink()
464 if sharenum not in shares:
465 # allocate a new share
466 allocated_size = 2000 # arbitrary, really
467 share = self._allocate_slot_share(bucketdir, secrets,
471 shares[sharenum] = share
472 shares[sharenum].writev(datav, new_length)
473 # and update the lease
474 shares[sharenum].add_or_renew_lease(lease_info)
477 # delete empty bucket directories
478 if not os.listdir(bucketdir):
483 self.add_latency("writev", time.time() - start)
484 return (testv_is_good, read_data)
486 def _allocate_slot_share(self, bucketdir, secrets, sharenum,
487 allocated_size, owner_num=0):
488 (write_enabler, renew_secret, cancel_secret) = secrets
489 my_nodeid = self.my_nodeid
490 fileutil.make_dirs(bucketdir)
491 filename = os.path.join(bucketdir, "%d" % sharenum)
492 share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
496 def remote_slot_readv(self, storage_index, shares, readv):
499 si_s = si_b2a(storage_index)
500 lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
501 facility="tahoe.storage", level=log.OPERATIONAL)
502 si_dir = storage_index_to_dir(storage_index)
503 # shares exist if there is a file for them
504 bucketdir = os.path.join(self.sharedir, si_dir)
505 if not os.path.isdir(bucketdir):
506 self.add_latency("readv", time.time() - start)
509 for sharenum_s in os.listdir(bucketdir):
511 sharenum = int(sharenum_s)
514 if sharenum in shares or not shares:
515 filename = os.path.join(bucketdir, sharenum_s)
516 msf = MutableShareFile(filename, self)
517 datavs[sharenum] = msf.readv(readv)
518 log.msg("returning shares %s" % (datavs.keys(),),
519 facility="tahoe.storage", level=log.NOISY, parent=lp)
520 self.add_latency("readv", time.time() - start)
523 def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
525 fileutil.make_dirs(self.corruption_advisory_dir)
526 now = time_format.iso_utc(sep="T")
527 si_s = si_b2a(storage_index)
528 # windows can't handle colons in the filename
529 fn = os.path.join(self.corruption_advisory_dir,
530 "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
532 f.write("report: Share Corruption\n")
533 f.write("type: %s\n" % share_type)
534 f.write("storage_index: %s\n" % si_s)
535 f.write("share_number: %d\n" % shnum)
540 log.msg(format=("client claims corruption in (%(share_type)s) " +
541 "%(si)s-%(shnum)d: %(reason)s"),
542 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
543 level=log.SCARY, umid="SGx2fA")