1 import os, re, weakref, struct, time
3 from foolscap.api import Referenceable
4 from twisted.application import service
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15 create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18 from allmydata.storage.expirer import LeaseCheckingCrawler
21 # storage/shares/incoming
22 # incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23 # be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24 # storage/shares/$START/$STORAGEINDEX
25 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
27 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
30 # $SHARENUM matches this regex:
31 NUM_RE=re.compile("^[0-9]+$")
35 class StorageServer(service.MultiService, Referenceable):
36 implements(RIStorageServer, IStatsProducer)
38 LeaseCheckerClass = LeaseCheckingCrawler
40 def __init__(self, storedir, nodeid, reserved_space=0,
41 discard_storage=False, readonly_storage=False,
43 expiration_enabled=False,
44 expiration_mode="age",
45 expiration_override_lease_duration=None,
46 expiration_cutoff_date=None,
47 expiration_sharetypes=("mutable", "immutable")):
48 service.MultiService.__init__(self)
49 assert isinstance(nodeid, str)
50 assert len(nodeid) == 20
51 self.my_nodeid = nodeid
52 self.storedir = storedir
53 sharedir = os.path.join(storedir, "shares")
54 fileutil.make_dirs(sharedir)
55 self.sharedir = sharedir
56 # we don't actually create the corruption-advisory dir until necessary
57 self.corruption_advisory_dir = os.path.join(storedir,
58 "corruption-advisories")
59 self.reserved_space = int(reserved_space)
60 self.no_storage = discard_storage
61 self.readonly_storage = readonly_storage
62 self.stats_provider = stats_provider
63 if self.stats_provider:
64 self.stats_provider.register_producer(self)
65 self.incomingdir = os.path.join(sharedir, 'incoming')
66 self._clean_incomplete()
67 fileutil.make_dirs(self.incomingdir)
68 self._active_writers = weakref.WeakKeyDictionary()
69 log.msg("StorageServer created", facility="tahoe.storage")
72 if self.get_available_space() is None:
73 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
74 umin="0wZ27w", level=log.UNUSUAL)
76 self.latencies = {"allocate": [], # immutable
81 "writev": [], # mutable
83 "add-lease": [], # both
87 self.add_bucket_counter()
89 statefile = os.path.join(self.storedir, "lease_checker.state")
90 historyfile = os.path.join(self.storedir, "lease_checker.history")
91 klass = self.LeaseCheckerClass
92 self.lease_checker = klass(self, statefile, historyfile,
93 expiration_enabled, expiration_mode,
94 expiration_override_lease_duration,
95 expiration_cutoff_date,
96 expiration_sharetypes)
97 self.lease_checker.setServiceParent(self)
100 return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
102 def add_bucket_counter(self):
103 statefile = os.path.join(self.storedir, "bucket_counter.state")
104 self.bucket_counter = BucketCountingCrawler(self, statefile)
105 self.bucket_counter.setServiceParent(self)
107 def count(self, name, delta=1):
108 if self.stats_provider:
109 self.stats_provider.count("storage_server." + name, delta)
111 def add_latency(self, category, latency):
112 a = self.latencies[category]
115 self.latencies[category] = a[-1000:]
117 def get_latencies(self):
118 """Return a dict, indexed by category, that contains a dict of
119 latency numbers for each category. If there are sufficient samples
120 for unambiguous interpretation, each dict will contain the
121 following keys: mean, 01_0_percentile, 10_0_percentile,
122 50_0_percentile (median), 90_0_percentile, 95_0_percentile,
123 99_0_percentile, 99_9_percentile. If there are insufficient
124 samples for a given percentile to be interpreted unambiguously
125 that percentile will be reported as None. If no samples have been
126 collected for the given category, then that category name will
127 not be present in the return value. """
128 # note that Amazon's Dynamo paper says they use 99.9% percentile.
130 for category in self.latencies:
131 if not self.latencies[category]:
134 samples = self.latencies[category][:]
136 stats["samplesize"] = count
139 stats["mean"] = sum(samples) / count
143 orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
144 (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
145 (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
146 (0.999, "99_9_percentile", 1000)]
148 for percentile, percentilestring, minnumtoobserve in orderstatlist:
149 if count >= minnumtoobserve:
150 stats[percentilestring] = samples[int(percentile*count)]
152 stats[percentilestring] = None
154 output[category] = stats
157 def log(self, *args, **kwargs):
158 if "facility" not in kwargs:
159 kwargs["facility"] = "tahoe.storage"
160 return log.msg(*args, **kwargs)
162 def _clean_incomplete(self):
163 fileutil.rm_dir(self.incomingdir)
166 # remember: RIStatsProvider requires that our return dict
167 # contains numeric values.
168 stats = { 'storage_server.allocated': self.allocated_size(), }
169 stats['storage_server.reserved_space'] = self.reserved_space
170 for category,ld in self.get_latencies().items():
171 for name,v in ld.items():
172 stats['storage_server.latencies.%s.%s' % (category, name)] = v
175 disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
176 writeable = disk['avail'] > 0
178 # spacetime predictors should use disk_avail / (d(disk_used)/dt)
179 stats['storage_server.disk_total'] = disk['total']
180 stats['storage_server.disk_used'] = disk['used']
181 stats['storage_server.disk_free_for_root'] = disk['free_for_root']
182 stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
183 stats['storage_server.disk_avail'] = disk['avail']
184 except AttributeError:
186 except EnvironmentError:
187 log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
190 if self.readonly_storage:
191 stats['storage_server.disk_avail'] = 0
194 stats['storage_server.accepting_immutable_shares'] = int(writeable)
195 s = self.bucket_counter.get_state()
196 bucket_count = s.get("last-complete-bucket-count")
198 stats['storage_server.total_bucket_count'] = bucket_count
201 def get_available_space(self):
202 """Returns available space for share storage in bytes, or None if no
203 API to get this information is available."""
205 if self.readonly_storage:
207 return fileutil.get_available_space(self.sharedir, self.reserved_space)
209 def allocated_size(self):
211 for bw in self._active_writers:
212 space += bw.allocated_size()
215 def remote_get_version(self):
216 remaining_space = self.get_available_space()
217 if remaining_space is None:
218 # We're on a platform that has no API to get disk stats.
219 remaining_space = 2**64
221 version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
222 { "maximum-immutable-share-size": remaining_space,
223 "tolerates-immutable-read-overrun": True,
224 "delete-mutable-shares-with-zero-length-writev": True,
226 "application-version": str(allmydata.__full_version__),
230 def remote_allocate_buckets(self, storage_index,
231 renew_secret, cancel_secret,
232 sharenums, allocated_size,
233 canary, owner_num=0):
234 # owner_num is not for clients to set, but rather it should be
235 # curried into the PersonalStorageServer instance that is dedicated
236 # to a particular owner.
238 self.count("allocate")
240 bucketwriters = {} # k: shnum, v: BucketWriter
241 si_dir = storage_index_to_dir(storage_index)
242 si_s = si_b2a(storage_index)
244 log.msg("storage: allocate_buckets %s" % si_s)
246 # in this implementation, the lease information (including secrets)
247 # goes into the share files themselves. It could also be put into a
248 # separate database. Note that the lease should not be added until
249 # the BucketWriter has been closed.
250 expire_time = time.time() + 31*24*60*60
251 lease_info = LeaseInfo(owner_num,
252 renew_secret, cancel_secret,
253 expire_time, self.my_nodeid)
255 max_space_per_bucket = allocated_size
257 remaining_space = self.get_available_space()
258 limited = remaining_space is not None
260 # this is a bit conservative, since some of this allocated_size()
261 # has already been written to disk, where it will show up in
262 # get_available_space.
263 remaining_space -= self.allocated_size()
264 # self.readonly_storage causes remaining_space <= 0
266 # fill alreadygot with all shares that we have, not just the ones
267 # they asked about: this will save them a lot of work. Add or update
268 # leases for all of them: if they want us to hold shares for this
269 # file, they'll want us to hold leases for this file.
270 for (shnum, fn) in self._get_bucket_shares(storage_index):
271 alreadygot.add(shnum)
273 sf.add_or_renew_lease(lease_info)
275 for shnum in sharenums:
276 incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
277 finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
278 if os.path.exists(finalhome):
279 # great! we already have it. easy.
281 elif os.path.exists(incominghome):
282 # Note that we don't create BucketWriters for shnums that
283 # have a partial share (in incoming/), so if a second upload
284 # occurs while the first is still in progress, the second
285 # uploader will use different storage servers.
287 elif (not limited) or (remaining_space >= max_space_per_bucket):
288 # ok! we need to create the new share file.
289 bw = BucketWriter(self, incominghome, finalhome,
290 max_space_per_bucket, lease_info, canary)
292 bw.throw_out_all_data = True
293 bucketwriters[shnum] = bw
294 self._active_writers[bw] = 1
296 remaining_space -= max_space_per_bucket
298 # bummer! not enough space to accept this bucket
302 fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
304 self.add_latency("allocate", time.time() - start)
305 return alreadygot, bucketwriters
307 def _iter_share_files(self, storage_index):
308 for shnum, filename in self._get_bucket_shares(storage_index):
309 f = open(filename, 'rb')
312 if header[:32] == MutableShareFile.MAGIC:
313 sf = MutableShareFile(filename, self)
314 # note: if the share has been migrated, the renew_lease()
315 # call will throw an exception, with information to help the
316 # client update the lease.
317 elif header[:4] == struct.pack(">L", 1):
318 sf = ShareFile(filename)
320 continue # non-sharefile
323 def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
326 self.count("add-lease")
327 new_expire_time = time.time() + 31*24*60*60
328 lease_info = LeaseInfo(owner_num,
329 renew_secret, cancel_secret,
330 new_expire_time, self.my_nodeid)
331 for sf in self._iter_share_files(storage_index):
332 sf.add_or_renew_lease(lease_info)
333 self.add_latency("add-lease", time.time() - start)
336 def remote_renew_lease(self, storage_index, renew_secret):
339 new_expire_time = time.time() + 31*24*60*60
340 found_buckets = False
341 for sf in self._iter_share_files(storage_index):
343 sf.renew_lease(renew_secret, new_expire_time)
344 self.add_latency("renew", time.time() - start)
345 if not found_buckets:
346 raise IndexError("no such lease to renew")
348 def bucket_writer_closed(self, bw, consumed_size):
349 if self.stats_provider:
350 self.stats_provider.count('storage_server.bytes_added', consumed_size)
351 del self._active_writers[bw]
353 def _get_bucket_shares(self, storage_index):
354 """Return a list of (shnum, pathname) tuples for files that hold
355 shares for this storage_index. In each tuple, 'shnum' will always be
356 the integer form of the last component of 'pathname'."""
357 storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
359 for f in os.listdir(storagedir):
361 filename = os.path.join(storagedir, f)
362 yield (int(f), filename)
364 # Commonly caused by there being no buckets at all.
367 def remote_get_buckets(self, storage_index):
370 si_s = si_b2a(storage_index)
371 log.msg("storage: get_buckets %s" % si_s)
372 bucketreaders = {} # k: sharenum, v: BucketReader
373 for shnum, filename in self._get_bucket_shares(storage_index):
374 bucketreaders[shnum] = BucketReader(self, filename,
375 storage_index, shnum)
376 self.add_latency("get", time.time() - start)
379 def get_leases(self, storage_index):
380 """Provide an iterator that yields all of the leases attached to this
381 bucket. Each lease is returned as a LeaseInfo instance.
383 This method is not for client use.
386 # since all shares get the same lease data, we just grab the leases
387 # from the first share
389 shnum, filename = self._get_bucket_shares(storage_index).next()
390 sf = ShareFile(filename)
391 return sf.get_leases()
392 except StopIteration:
395 def remote_slot_testv_and_readv_and_writev(self, storage_index,
397 test_and_write_vectors,
401 si_s = si_b2a(storage_index)
402 log.msg("storage: slot_writev %s" % si_s)
403 si_dir = storage_index_to_dir(storage_index)
404 (write_enabler, renew_secret, cancel_secret) = secrets
405 # shares exist if there is a file for them
406 bucketdir = os.path.join(self.sharedir, si_dir)
408 if os.path.isdir(bucketdir):
409 for sharenum_s in os.listdir(bucketdir):
411 sharenum = int(sharenum_s)
414 filename = os.path.join(bucketdir, sharenum_s)
415 msf = MutableShareFile(filename, self)
416 msf.check_write_enabler(write_enabler, si_s)
417 shares[sharenum] = msf
418 # write_enabler is good for all existing shares.
420 # Now evaluate test vectors.
422 for sharenum in test_and_write_vectors:
423 (testv, datav, new_length) = test_and_write_vectors[sharenum]
424 if sharenum in shares:
425 if not shares[sharenum].check_testv(testv):
426 self.log("testv failed: [%d]: %r" % (sharenum, testv))
427 testv_is_good = False
430 # compare the vectors against an empty share, in which all
431 # reads return empty strings.
432 if not EmptyShare().check_testv(testv):
433 self.log("testv failed (empty): [%d] %r" % (sharenum,
435 testv_is_good = False
438 # now gather the read vectors, before we do any writes
440 for sharenum, share in shares.items():
441 read_data[sharenum] = share.readv(read_vector)
444 expire_time = time.time() + 31*24*60*60 # one month
445 lease_info = LeaseInfo(ownerid,
446 renew_secret, cancel_secret,
447 expire_time, self.my_nodeid)
450 # now apply the write vectors
451 for sharenum in test_and_write_vectors:
452 (testv, datav, new_length) = test_and_write_vectors[sharenum]
454 if sharenum in shares:
455 shares[sharenum].unlink()
457 if sharenum not in shares:
458 # allocate a new share
459 allocated_size = 2000 # arbitrary, really
460 share = self._allocate_slot_share(bucketdir, secrets,
464 shares[sharenum] = share
465 shares[sharenum].writev(datav, new_length)
466 # and update the lease
467 shares[sharenum].add_or_renew_lease(lease_info)
470 # delete empty bucket directories
471 if not os.listdir(bucketdir):
476 self.add_latency("writev", time.time() - start)
477 return (testv_is_good, read_data)
479 def _allocate_slot_share(self, bucketdir, secrets, sharenum,
480 allocated_size, owner_num=0):
481 (write_enabler, renew_secret, cancel_secret) = secrets
482 my_nodeid = self.my_nodeid
483 fileutil.make_dirs(bucketdir)
484 filename = os.path.join(bucketdir, "%d" % sharenum)
485 share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
489 def remote_slot_readv(self, storage_index, shares, readv):
492 si_s = si_b2a(storage_index)
493 lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
494 facility="tahoe.storage", level=log.OPERATIONAL)
495 si_dir = storage_index_to_dir(storage_index)
496 # shares exist if there is a file for them
497 bucketdir = os.path.join(self.sharedir, si_dir)
498 if not os.path.isdir(bucketdir):
499 self.add_latency("readv", time.time() - start)
502 for sharenum_s in os.listdir(bucketdir):
504 sharenum = int(sharenum_s)
507 if sharenum in shares or not shares:
508 filename = os.path.join(bucketdir, sharenum_s)
509 msf = MutableShareFile(filename, self)
510 datavs[sharenum] = msf.readv(readv)
511 log.msg("returning shares %s" % (datavs.keys(),),
512 facility="tahoe.storage", level=log.NOISY, parent=lp)
513 self.add_latency("readv", time.time() - start)
516 def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
518 fileutil.make_dirs(self.corruption_advisory_dir)
519 now = time_format.iso_utc(sep="T")
520 si_s = si_b2a(storage_index)
521 # windows can't handle colons in the filename
522 fn = os.path.join(self.corruption_advisory_dir,
523 "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
525 f.write("report: Share Corruption\n")
526 f.write("type: %s\n" % share_type)
527 f.write("storage_index: %s\n" % si_s)
528 f.write("share_number: %d\n" % shnum)
533 log.msg(format=("client claims corruption in (%(share_type)s) " +
534 "%(si)s-%(shnum)d: %(reason)s"),
535 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
536 level=log.SCARY, umid="SGx2fA")