1 import os, re, weakref, struct, time
3 from foolscap.api import Referenceable
4 from twisted.application import service
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15 create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18 from allmydata.storage.expirer import LeaseCheckingCrawler
21 # storage/shares/incoming
22 # incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23 # be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24 # storage/shares/$START/$STORAGEINDEX
25 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
27 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
30 # $SHARENUM matches this regex:
31 NUM_RE=re.compile("^[0-9]+$")
35 class StorageServer(service.MultiService, Referenceable):
36 implements(RIStorageServer, IStatsProducer)
38 LeaseCheckerClass = LeaseCheckingCrawler
40 def __init__(self, storedir, nodeid, reserved_space=0,
41 discard_storage=False, readonly_storage=False,
43 expiration_enabled=False,
44 expiration_mode="age",
45 expiration_override_lease_duration=None,
46 expiration_cutoff_date=None,
47 expiration_sharetypes=("mutable", "immutable")):
48 service.MultiService.__init__(self)
49 assert isinstance(nodeid, str)
50 assert len(nodeid) == 20
51 self.my_nodeid = nodeid
52 self.storedir = storedir
53 sharedir = os.path.join(storedir, "shares")
54 fileutil.make_dirs(sharedir)
55 self.sharedir = sharedir
56 # we don't actually create the corruption-advisory dir until necessary
57 self.corruption_advisory_dir = os.path.join(storedir,
58 "corruption-advisories")
59 self.reserved_space = int(reserved_space)
60 self.no_storage = discard_storage
61 self.readonly_storage = readonly_storage
62 self.stats_provider = stats_provider
63 if self.stats_provider:
64 self.stats_provider.register_producer(self)
65 self.incomingdir = os.path.join(sharedir, 'incoming')
66 self._clean_incomplete()
67 fileutil.make_dirs(self.incomingdir)
68 self._active_writers = weakref.WeakKeyDictionary()
69 log.msg("StorageServer created", facility="tahoe.storage")
72 if self.get_available_space() is None:
73 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
74 umin="0wZ27w", level=log.UNUSUAL)
76 self.latencies = {"allocate": [], # immutable
81 "writev": [], # mutable
83 "add-lease": [], # both
87 self.add_bucket_counter()
89 statefile = os.path.join(self.storedir, "lease_checker.state")
90 historyfile = os.path.join(self.storedir, "lease_checker.history")
91 klass = self.LeaseCheckerClass
92 self.lease_checker = klass(self, statefile, historyfile,
93 expiration_enabled, expiration_mode,
94 expiration_override_lease_duration,
95 expiration_cutoff_date,
96 expiration_sharetypes)
97 self.lease_checker.setServiceParent(self)
100 return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
102 def add_bucket_counter(self):
103 statefile = os.path.join(self.storedir, "bucket_counter.state")
104 self.bucket_counter = BucketCountingCrawler(self, statefile)
105 self.bucket_counter.setServiceParent(self)
107 def count(self, name, delta=1):
108 if self.stats_provider:
109 self.stats_provider.count("storage_server." + name, delta)
111 def add_latency(self, category, latency):
112 a = self.latencies[category]
115 self.latencies[category] = a[-1000:]
117 def get_latencies(self):
118 """Return a dict, indexed by category, that contains a dict of
119 latency numbers for each category. If there are sufficient samples
120 for unambiguous interpretation, each dict will contain the
121 following keys: mean, 01_0_percentile, 10_0_percentile,
122 50_0_percentile (median), 90_0_percentile, 95_0_percentile,
123 99_0_percentile, 99_9_percentile. If there are insufficient
124 samples for a given percentile to be interpreted unambiguously
125 that percentile will be reported as None. If no samples have been
126 collected for the given category, then that category name will
127 not be present in the return value. """
128 # note that Amazon's Dynamo paper says they use 99.9% percentile.
130 for category in self.latencies:
131 if not self.latencies[category]:
134 samples = self.latencies[category][:]
136 stats["samplesize"] = count
139 stats["mean"] = sum(samples) / count
143 orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
144 (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
145 (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
146 (0.999, "99_9_percentile", 1000)]
148 for percentile, percentilestring, minnumtoobserve in orderstatlist:
149 if count >= minnumtoobserve:
150 stats[percentilestring] = samples[int(percentile*count)]
152 stats[percentilestring] = None
154 output[category] = stats
157 def log(self, *args, **kwargs):
158 if "facility" not in kwargs:
159 kwargs["facility"] = "tahoe.storage"
160 return log.msg(*args, **kwargs)
162 def _clean_incomplete(self):
163 fileutil.rm_dir(self.incomingdir)
166 # remember: RIStatsProvider requires that our return dict
167 # contains numeric values.
168 stats = { 'storage_server.allocated': self.allocated_size(), }
169 stats['storage_server.reserved_space'] = self.reserved_space
170 for category,ld in self.get_latencies().items():
171 for name,v in ld.items():
172 stats['storage_server.latencies.%s.%s' % (category, name)] = v
175 disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
176 writeable = disk['avail'] > 0
178 # spacetime predictors should use disk_avail / (d(disk_used)/dt)
179 stats['storage_server.disk_total'] = disk['total']
180 stats['storage_server.disk_used'] = disk['used']
181 stats['storage_server.disk_free_for_root'] = disk['free_for_root']
182 stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
183 stats['storage_server.disk_avail'] = disk['avail']
184 except AttributeError:
186 except EnvironmentError:
187 log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
190 if self.readonly_storage:
191 stats['storage_server.disk_avail'] = 0
194 stats['storage_server.accepting_immutable_shares'] = int(writeable)
195 s = self.bucket_counter.get_state()
196 bucket_count = s.get("last-complete-bucket-count")
198 stats['storage_server.total_bucket_count'] = bucket_count
201 def get_available_space(self):
202 """Returns available space for share storage in bytes, or None if no
203 API to get this information is available."""
205 if self.readonly_storage:
207 return fileutil.get_available_space(self.sharedir, self.reserved_space)
209 def allocated_size(self):
211 for bw in self._active_writers:
212 space += bw.allocated_size()
215 def remote_get_version(self):
216 remaining_space = self.get_available_space()
217 if remaining_space is None:
218 # We're on a platform that has no API to get disk stats.
219 remaining_space = 2**64
221 version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
222 { "maximum-immutable-share-size": remaining_space,
223 "tolerates-immutable-read-overrun": True,
224 "delete-mutable-shares-with-zero-length-writev": True,
225 "prevents-read-past-end-of-share-data": True,
227 "application-version": str(allmydata.__full_version__),
231 def remote_allocate_buckets(self, storage_index,
232 renew_secret, cancel_secret,
233 sharenums, allocated_size,
234 canary, owner_num=0):
235 # owner_num is not for clients to set, but rather it should be
236 # curried into the PersonalStorageServer instance that is dedicated
237 # to a particular owner.
239 self.count("allocate")
241 bucketwriters = {} # k: shnum, v: BucketWriter
242 si_dir = storage_index_to_dir(storage_index)
243 si_s = si_b2a(storage_index)
245 log.msg("storage: allocate_buckets %s" % si_s)
247 # in this implementation, the lease information (including secrets)
248 # goes into the share files themselves. It could also be put into a
249 # separate database. Note that the lease should not be added until
250 # the BucketWriter has been closed.
251 expire_time = time.time() + 31*24*60*60
252 lease_info = LeaseInfo(owner_num,
253 renew_secret, cancel_secret,
254 expire_time, self.my_nodeid)
256 max_space_per_bucket = allocated_size
258 remaining_space = self.get_available_space()
259 limited = remaining_space is not None
261 # this is a bit conservative, since some of this allocated_size()
262 # has already been written to disk, where it will show up in
263 # get_available_space.
264 remaining_space -= self.allocated_size()
265 # self.readonly_storage causes remaining_space <= 0
267 # fill alreadygot with all shares that we have, not just the ones
268 # they asked about: this will save them a lot of work. Add or update
269 # leases for all of them: if they want us to hold shares for this
270 # file, they'll want us to hold leases for this file.
271 for (shnum, fn) in self._get_bucket_shares(storage_index):
272 alreadygot.add(shnum)
274 sf.add_or_renew_lease(lease_info)
276 for shnum in sharenums:
277 incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
278 finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
279 if os.path.exists(finalhome):
280 # great! we already have it. easy.
282 elif os.path.exists(incominghome):
283 # Note that we don't create BucketWriters for shnums that
284 # have a partial share (in incoming/), so if a second upload
285 # occurs while the first is still in progress, the second
286 # uploader will use different storage servers.
288 elif (not limited) or (remaining_space >= max_space_per_bucket):
289 # ok! we need to create the new share file.
290 bw = BucketWriter(self, incominghome, finalhome,
291 max_space_per_bucket, lease_info, canary)
293 bw.throw_out_all_data = True
294 bucketwriters[shnum] = bw
295 self._active_writers[bw] = 1
297 remaining_space -= max_space_per_bucket
299 # bummer! not enough space to accept this bucket
303 fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
305 self.add_latency("allocate", time.time() - start)
306 return alreadygot, bucketwriters
308 def _iter_share_files(self, storage_index):
309 for shnum, filename in self._get_bucket_shares(storage_index):
310 f = open(filename, 'rb')
313 if header[:32] == MutableShareFile.MAGIC:
314 sf = MutableShareFile(filename, self)
315 # note: if the share has been migrated, the renew_lease()
316 # call will throw an exception, with information to help the
317 # client update the lease.
318 elif header[:4] == struct.pack(">L", 1):
319 sf = ShareFile(filename)
321 continue # non-sharefile
324 def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
327 self.count("add-lease")
328 new_expire_time = time.time() + 31*24*60*60
329 lease_info = LeaseInfo(owner_num,
330 renew_secret, cancel_secret,
331 new_expire_time, self.my_nodeid)
332 for sf in self._iter_share_files(storage_index):
333 sf.add_or_renew_lease(lease_info)
334 self.add_latency("add-lease", time.time() - start)
337 def remote_renew_lease(self, storage_index, renew_secret):
340 new_expire_time = time.time() + 31*24*60*60
341 found_buckets = False
342 for sf in self._iter_share_files(storage_index):
344 sf.renew_lease(renew_secret, new_expire_time)
345 self.add_latency("renew", time.time() - start)
346 if not found_buckets:
347 raise IndexError("no such lease to renew")
349 def bucket_writer_closed(self, bw, consumed_size):
350 if self.stats_provider:
351 self.stats_provider.count('storage_server.bytes_added', consumed_size)
352 del self._active_writers[bw]
354 def _get_bucket_shares(self, storage_index):
355 """Return a list of (shnum, pathname) tuples for files that hold
356 shares for this storage_index. In each tuple, 'shnum' will always be
357 the integer form of the last component of 'pathname'."""
358 storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
360 for f in os.listdir(storagedir):
362 filename = os.path.join(storagedir, f)
363 yield (int(f), filename)
365 # Commonly caused by there being no buckets at all.
368 def remote_get_buckets(self, storage_index):
371 si_s = si_b2a(storage_index)
372 log.msg("storage: get_buckets %s" % si_s)
373 bucketreaders = {} # k: sharenum, v: BucketReader
374 for shnum, filename in self._get_bucket_shares(storage_index):
375 bucketreaders[shnum] = BucketReader(self, filename,
376 storage_index, shnum)
377 self.add_latency("get", time.time() - start)
380 def get_leases(self, storage_index):
381 """Provide an iterator that yields all of the leases attached to this
382 bucket. Each lease is returned as a LeaseInfo instance.
384 This method is not for client use.
387 # since all shares get the same lease data, we just grab the leases
388 # from the first share
390 shnum, filename = self._get_bucket_shares(storage_index).next()
391 sf = ShareFile(filename)
392 return sf.get_leases()
393 except StopIteration:
396 def remote_slot_testv_and_readv_and_writev(self, storage_index,
398 test_and_write_vectors,
402 si_s = si_b2a(storage_index)
403 log.msg("storage: slot_writev %s" % si_s)
404 si_dir = storage_index_to_dir(storage_index)
405 (write_enabler, renew_secret, cancel_secret) = secrets
406 # shares exist if there is a file for them
407 bucketdir = os.path.join(self.sharedir, si_dir)
409 if os.path.isdir(bucketdir):
410 for sharenum_s in os.listdir(bucketdir):
412 sharenum = int(sharenum_s)
415 filename = os.path.join(bucketdir, sharenum_s)
416 msf = MutableShareFile(filename, self)
417 msf.check_write_enabler(write_enabler, si_s)
418 shares[sharenum] = msf
419 # write_enabler is good for all existing shares.
421 # Now evaluate test vectors.
423 for sharenum in test_and_write_vectors:
424 (testv, datav, new_length) = test_and_write_vectors[sharenum]
425 if sharenum in shares:
426 if not shares[sharenum].check_testv(testv):
427 self.log("testv failed: [%d]: %r" % (sharenum, testv))
428 testv_is_good = False
431 # compare the vectors against an empty share, in which all
432 # reads return empty strings.
433 if not EmptyShare().check_testv(testv):
434 self.log("testv failed (empty): [%d] %r" % (sharenum,
436 testv_is_good = False
439 # now gather the read vectors, before we do any writes
441 for sharenum, share in shares.items():
442 read_data[sharenum] = share.readv(read_vector)
445 expire_time = time.time() + 31*24*60*60 # one month
446 lease_info = LeaseInfo(ownerid,
447 renew_secret, cancel_secret,
448 expire_time, self.my_nodeid)
451 # now apply the write vectors
452 for sharenum in test_and_write_vectors:
453 (testv, datav, new_length) = test_and_write_vectors[sharenum]
455 if sharenum in shares:
456 shares[sharenum].unlink()
458 if sharenum not in shares:
459 # allocate a new share
460 allocated_size = 2000 # arbitrary, really
461 share = self._allocate_slot_share(bucketdir, secrets,
465 shares[sharenum] = share
466 shares[sharenum].writev(datav, new_length)
467 # and update the lease
468 shares[sharenum].add_or_renew_lease(lease_info)
471 # delete empty bucket directories
472 if not os.listdir(bucketdir):
477 self.add_latency("writev", time.time() - start)
478 return (testv_is_good, read_data)
480 def _allocate_slot_share(self, bucketdir, secrets, sharenum,
481 allocated_size, owner_num=0):
482 (write_enabler, renew_secret, cancel_secret) = secrets
483 my_nodeid = self.my_nodeid
484 fileutil.make_dirs(bucketdir)
485 filename = os.path.join(bucketdir, "%d" % sharenum)
486 share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
490 def remote_slot_readv(self, storage_index, shares, readv):
493 si_s = si_b2a(storage_index)
494 lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
495 facility="tahoe.storage", level=log.OPERATIONAL)
496 si_dir = storage_index_to_dir(storage_index)
497 # shares exist if there is a file for them
498 bucketdir = os.path.join(self.sharedir, si_dir)
499 if not os.path.isdir(bucketdir):
500 self.add_latency("readv", time.time() - start)
503 for sharenum_s in os.listdir(bucketdir):
505 sharenum = int(sharenum_s)
508 if sharenum in shares or not shares:
509 filename = os.path.join(bucketdir, sharenum_s)
510 msf = MutableShareFile(filename, self)
511 datavs[sharenum] = msf.readv(readv)
512 log.msg("returning shares %s" % (datavs.keys(),),
513 facility="tahoe.storage", level=log.NOISY, parent=lp)
514 self.add_latency("readv", time.time() - start)
517 def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
519 fileutil.make_dirs(self.corruption_advisory_dir)
520 now = time_format.iso_utc(sep="T")
521 si_s = si_b2a(storage_index)
522 # windows can't handle colons in the filename
523 fn = os.path.join(self.corruption_advisory_dir,
524 "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
526 f.write("report: Share Corruption\n")
527 f.write("type: %s\n" % share_type)
528 f.write("storage_index: %s\n" % si_s)
529 f.write("share_number: %d\n" % shnum)
534 log.msg(format=("client claims corruption in (%(share_type)s) " +
535 "%(si)s-%(shnum)d: %(reason)s"),
536 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
537 level=log.SCARY, umid="SGx2fA")