1 import os, re, weakref, struct, time
3 from foolscap.api import Referenceable
4 from twisted.application import service
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15 create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18 from allmydata.storage.expirer import LeaseCheckingCrawler
21 # storage/shares/incoming
22 # incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23 # be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24 # storage/shares/$START/$STORAGEINDEX
25 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
27 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
30 # $SHARENUM matches this regex:
31 NUM_RE=re.compile("^[0-9]+$")
35 class StorageServer(service.MultiService, Referenceable):
36 implements(RIStorageServer, IStatsProducer)
38 LeaseCheckerClass = LeaseCheckingCrawler
40 def __init__(self, storedir, nodeid, reserved_space=0,
41 discard_storage=False, readonly_storage=False,
43 expiration_enabled=False,
44 expiration_mode="age",
45 expiration_override_lease_duration=None,
46 expiration_cutoff_date=None,
47 expiration_sharetypes=("mutable", "immutable")):
48 service.MultiService.__init__(self)
49 assert isinstance(nodeid, str)
50 assert len(nodeid) == 20
51 self.my_nodeid = nodeid
52 self.storedir = storedir
53 sharedir = os.path.join(storedir, "shares")
54 fileutil.make_dirs(sharedir)
55 self.sharedir = sharedir
56 # we don't actually create the corruption-advisory dir until necessary
57 self.corruption_advisory_dir = os.path.join(storedir,
58 "corruption-advisories")
59 self.reserved_space = int(reserved_space)
60 self.no_storage = discard_storage
61 self.readonly_storage = readonly_storage
62 self.stats_provider = stats_provider
63 if self.stats_provider:
64 self.stats_provider.register_producer(self)
65 self.incomingdir = os.path.join(sharedir, 'incoming')
66 self._clean_incomplete()
67 fileutil.make_dirs(self.incomingdir)
68 self._active_writers = weakref.WeakKeyDictionary()
69 log.msg("StorageServer created", facility="tahoe.storage")
72 if self.get_available_space() is None:
73 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
74 umin="0wZ27w", level=log.UNUSUAL)
76 self.latencies = {"allocate": [], # immutable
81 "writev": [], # mutable
83 "add-lease": [], # both
87 self.add_bucket_counter()
89 statefile = os.path.join(self.storedir, "lease_checker.state")
90 historyfile = os.path.join(self.storedir, "lease_checker.history")
91 klass = self.LeaseCheckerClass
92 self.lease_checker = klass(self, statefile, historyfile,
93 expiration_enabled, expiration_mode,
94 expiration_override_lease_duration,
95 expiration_cutoff_date,
96 expiration_sharetypes)
97 self.lease_checker.setServiceParent(self)
100 return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
102 def add_bucket_counter(self):
103 statefile = os.path.join(self.storedir, "bucket_counter.state")
104 self.bucket_counter = BucketCountingCrawler(self, statefile)
105 self.bucket_counter.setServiceParent(self)
107 def count(self, name, delta=1):
108 if self.stats_provider:
109 self.stats_provider.count("storage_server." + name, delta)
111 def add_latency(self, category, latency):
112 a = self.latencies[category]
115 self.latencies[category] = a[-1000:]
117 def get_latencies(self):
118 """Return a dict, indexed by category, that contains a dict of
119 latency numbers for each category. If there are sufficient samples
120 for unambiguous interpretation, each dict will contain the
121 following keys: mean, 01_0_percentile, 10_0_percentile,
122 50_0_percentile (median), 90_0_percentile, 95_0_percentile,
123 99_0_percentile, 99_9_percentile. If there are insufficient
124 samples for a given percentile to be interpreted unambiguously
125 that percentile will be reported as None. If no samples have been
126 collected for the given category, then that category name will
127 not be present in the return value. """
128 # note that Amazon's Dynamo paper says they use 99.9% percentile.
130 for category in self.latencies:
131 if not self.latencies[category]:
134 samples = self.latencies[category][:]
136 stats["samplesize"] = count
139 stats["mean"] = sum(samples) / count
143 orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
144 (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
145 (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
146 (0.999, "99_9_percentile", 1000)]
148 for percentile, percentilestring, minnumtoobserve in orderstatlist:
149 if count >= minnumtoobserve:
150 stats[percentilestring] = samples[int(percentile*count)]
152 stats[percentilestring] = None
154 output[category] = stats
157 def log(self, *args, **kwargs):
158 if "facility" not in kwargs:
159 kwargs["facility"] = "tahoe.storage"
160 return log.msg(*args, **kwargs)
162 def _clean_incomplete(self):
163 fileutil.rm_dir(self.incomingdir)
166 # remember: RIStatsProvider requires that our return dict
167 # contains numeric values.
168 stats = { 'storage_server.allocated': self.allocated_size(), }
169 stats['storage_server.reserved_space'] = self.reserved_space
170 for category,ld in self.get_latencies().items():
171 for name,v in ld.items():
172 stats['storage_server.latencies.%s.%s' % (category, name)] = v
175 disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
176 writeable = disk['avail'] > 0
178 # spacetime predictors should use disk_avail / (d(disk_used)/dt)
179 stats['storage_server.disk_total'] = disk['total']
180 stats['storage_server.disk_used'] = disk['used']
181 stats['storage_server.disk_free_for_root'] = disk['free_for_root']
182 stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
183 stats['storage_server.disk_avail'] = disk['avail']
184 except AttributeError:
186 except EnvironmentError:
187 log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
190 if self.readonly_storage:
191 stats['storage_server.disk_avail'] = 0
194 stats['storage_server.accepting_immutable_shares'] = int(writeable)
195 s = self.bucket_counter.get_state()
196 bucket_count = s.get("last-complete-bucket-count")
198 stats['storage_server.total_bucket_count'] = bucket_count
201 def get_available_space(self):
202 """Returns available space for share storage in bytes, or None if no
203 API to get this information is available."""
205 if self.readonly_storage:
207 return fileutil.get_available_space(self.sharedir, self.reserved_space)
209 def allocated_size(self):
211 for bw in self._active_writers:
212 space += bw.allocated_size()
215 def remote_get_version(self):
216 remaining_space = self.get_available_space()
217 if remaining_space is None:
218 # We're on a platform that has no API to get disk stats.
219 remaining_space = 2**64
221 version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
222 { "maximum-immutable-share-size": remaining_space,
223 "tolerates-immutable-read-overrun": True,
224 "delete-mutable-shares-with-zero-length-writev": True,
225 "fills-holes-with-zero-bytes": True,
226 "prevents-read-past-end-of-share-data": True,
228 "application-version": str(allmydata.__full_version__),
232 def remote_allocate_buckets(self, storage_index,
233 renew_secret, cancel_secret,
234 sharenums, allocated_size,
235 canary, owner_num=0):
236 # owner_num is not for clients to set, but rather it should be
237 # curried into the PersonalStorageServer instance that is dedicated
238 # to a particular owner.
240 self.count("allocate")
242 bucketwriters = {} # k: shnum, v: BucketWriter
243 si_dir = storage_index_to_dir(storage_index)
244 si_s = si_b2a(storage_index)
246 log.msg("storage: allocate_buckets %s" % si_s)
248 # in this implementation, the lease information (including secrets)
249 # goes into the share files themselves. It could also be put into a
250 # separate database. Note that the lease should not be added until
251 # the BucketWriter has been closed.
252 expire_time = time.time() + 31*24*60*60
253 lease_info = LeaseInfo(owner_num,
254 renew_secret, cancel_secret,
255 expire_time, self.my_nodeid)
257 max_space_per_bucket = allocated_size
259 remaining_space = self.get_available_space()
260 limited = remaining_space is not None
262 # this is a bit conservative, since some of this allocated_size()
263 # has already been written to disk, where it will show up in
264 # get_available_space.
265 remaining_space -= self.allocated_size()
266 # self.readonly_storage causes remaining_space <= 0
268 # fill alreadygot with all shares that we have, not just the ones
269 # they asked about: this will save them a lot of work. Add or update
270 # leases for all of them: if they want us to hold shares for this
271 # file, they'll want us to hold leases for this file.
272 for (shnum, fn) in self._get_bucket_shares(storage_index):
273 alreadygot.add(shnum)
275 sf.add_or_renew_lease(lease_info)
277 for shnum in sharenums:
278 incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
279 finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
280 if os.path.exists(finalhome):
281 # great! we already have it. easy.
283 elif os.path.exists(incominghome):
284 # Note that we don't create BucketWriters for shnums that
285 # have a partial share (in incoming/), so if a second upload
286 # occurs while the first is still in progress, the second
287 # uploader will use different storage servers.
289 elif (not limited) or (remaining_space >= max_space_per_bucket):
290 # ok! we need to create the new share file.
291 bw = BucketWriter(self, incominghome, finalhome,
292 max_space_per_bucket, lease_info, canary)
294 bw.throw_out_all_data = True
295 bucketwriters[shnum] = bw
296 self._active_writers[bw] = 1
298 remaining_space -= max_space_per_bucket
300 # bummer! not enough space to accept this bucket
304 fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
306 self.add_latency("allocate", time.time() - start)
307 return alreadygot, bucketwriters
309 def _iter_share_files(self, storage_index):
310 for shnum, filename in self._get_bucket_shares(storage_index):
311 f = open(filename, 'rb')
314 if header[:32] == MutableShareFile.MAGIC:
315 sf = MutableShareFile(filename, self)
316 # note: if the share has been migrated, the renew_lease()
317 # call will throw an exception, with information to help the
318 # client update the lease.
319 elif header[:4] == struct.pack(">L", 1):
320 sf = ShareFile(filename)
322 continue # non-sharefile
325 def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
328 self.count("add-lease")
329 new_expire_time = time.time() + 31*24*60*60
330 lease_info = LeaseInfo(owner_num,
331 renew_secret, cancel_secret,
332 new_expire_time, self.my_nodeid)
333 for sf in self._iter_share_files(storage_index):
334 sf.add_or_renew_lease(lease_info)
335 self.add_latency("add-lease", time.time() - start)
338 def remote_renew_lease(self, storage_index, renew_secret):
341 new_expire_time = time.time() + 31*24*60*60
342 found_buckets = False
343 for sf in self._iter_share_files(storage_index):
345 sf.renew_lease(renew_secret, new_expire_time)
346 self.add_latency("renew", time.time() - start)
347 if not found_buckets:
348 raise IndexError("no such lease to renew")
350 def bucket_writer_closed(self, bw, consumed_size):
351 if self.stats_provider:
352 self.stats_provider.count('storage_server.bytes_added', consumed_size)
353 del self._active_writers[bw]
355 def _get_bucket_shares(self, storage_index):
356 """Return a list of (shnum, pathname) tuples for files that hold
357 shares for this storage_index. In each tuple, 'shnum' will always be
358 the integer form of the last component of 'pathname'."""
359 storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
361 for f in os.listdir(storagedir):
363 filename = os.path.join(storagedir, f)
364 yield (int(f), filename)
366 # Commonly caused by there being no buckets at all.
369 def remote_get_buckets(self, storage_index):
372 si_s = si_b2a(storage_index)
373 log.msg("storage: get_buckets %s" % si_s)
374 bucketreaders = {} # k: sharenum, v: BucketReader
375 for shnum, filename in self._get_bucket_shares(storage_index):
376 bucketreaders[shnum] = BucketReader(self, filename,
377 storage_index, shnum)
378 self.add_latency("get", time.time() - start)
381 def get_leases(self, storage_index):
382 """Provide an iterator that yields all of the leases attached to this
383 bucket. Each lease is returned as a LeaseInfo instance.
385 This method is not for client use.
388 # since all shares get the same lease data, we just grab the leases
389 # from the first share
391 shnum, filename = self._get_bucket_shares(storage_index).next()
392 sf = ShareFile(filename)
393 return sf.get_leases()
394 except StopIteration:
397 def remote_slot_testv_and_readv_and_writev(self, storage_index,
399 test_and_write_vectors,
403 si_s = si_b2a(storage_index)
404 log.msg("storage: slot_writev %s" % si_s)
405 si_dir = storage_index_to_dir(storage_index)
406 (write_enabler, renew_secret, cancel_secret) = secrets
407 # shares exist if there is a file for them
408 bucketdir = os.path.join(self.sharedir, si_dir)
410 if os.path.isdir(bucketdir):
411 for sharenum_s in os.listdir(bucketdir):
413 sharenum = int(sharenum_s)
416 filename = os.path.join(bucketdir, sharenum_s)
417 msf = MutableShareFile(filename, self)
418 msf.check_write_enabler(write_enabler, si_s)
419 shares[sharenum] = msf
420 # write_enabler is good for all existing shares.
422 # Now evaluate test vectors.
424 for sharenum in test_and_write_vectors:
425 (testv, datav, new_length) = test_and_write_vectors[sharenum]
426 if sharenum in shares:
427 if not shares[sharenum].check_testv(testv):
428 self.log("testv failed: [%d]: %r" % (sharenum, testv))
429 testv_is_good = False
432 # compare the vectors against an empty share, in which all
433 # reads return empty strings.
434 if not EmptyShare().check_testv(testv):
435 self.log("testv failed (empty): [%d] %r" % (sharenum,
437 testv_is_good = False
440 # now gather the read vectors, before we do any writes
442 for sharenum, share in shares.items():
443 read_data[sharenum] = share.readv(read_vector)
446 expire_time = time.time() + 31*24*60*60 # one month
447 lease_info = LeaseInfo(ownerid,
448 renew_secret, cancel_secret,
449 expire_time, self.my_nodeid)
452 # now apply the write vectors
453 for sharenum in test_and_write_vectors:
454 (testv, datav, new_length) = test_and_write_vectors[sharenum]
456 if sharenum in shares:
457 shares[sharenum].unlink()
459 if sharenum not in shares:
460 # allocate a new share
461 allocated_size = 2000 # arbitrary, really
462 share = self._allocate_slot_share(bucketdir, secrets,
466 shares[sharenum] = share
467 shares[sharenum].writev(datav, new_length)
468 # and update the lease
469 shares[sharenum].add_or_renew_lease(lease_info)
472 # delete empty bucket directories
473 if not os.listdir(bucketdir):
478 self.add_latency("writev", time.time() - start)
479 return (testv_is_good, read_data)
481 def _allocate_slot_share(self, bucketdir, secrets, sharenum,
482 allocated_size, owner_num=0):
483 (write_enabler, renew_secret, cancel_secret) = secrets
484 my_nodeid = self.my_nodeid
485 fileutil.make_dirs(bucketdir)
486 filename = os.path.join(bucketdir, "%d" % sharenum)
487 share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
491 def remote_slot_readv(self, storage_index, shares, readv):
494 si_s = si_b2a(storage_index)
495 lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
496 facility="tahoe.storage", level=log.OPERATIONAL)
497 si_dir = storage_index_to_dir(storage_index)
498 # shares exist if there is a file for them
499 bucketdir = os.path.join(self.sharedir, si_dir)
500 if not os.path.isdir(bucketdir):
501 self.add_latency("readv", time.time() - start)
504 for sharenum_s in os.listdir(bucketdir):
506 sharenum = int(sharenum_s)
509 if sharenum in shares or not shares:
510 filename = os.path.join(bucketdir, sharenum_s)
511 msf = MutableShareFile(filename, self)
512 datavs[sharenum] = msf.readv(readv)
513 log.msg("returning shares %s" % (datavs.keys(),),
514 facility="tahoe.storage", level=log.NOISY, parent=lp)
515 self.add_latency("readv", time.time() - start)
518 def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
520 fileutil.make_dirs(self.corruption_advisory_dir)
521 now = time_format.iso_utc(sep="T")
522 si_s = si_b2a(storage_index)
523 # windows can't handle colons in the filename
524 fn = os.path.join(self.corruption_advisory_dir,
525 "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
527 f.write("report: Share Corruption\n")
528 f.write("type: %s\n" % share_type)
529 f.write("storage_index: %s\n" % si_s)
530 f.write("share_number: %d\n" % shnum)
535 log.msg(format=("client claims corruption in (%(share_type)s) " +
536 "%(si)s-%(shnum)d: %(reason)s"),
537 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
538 level=log.SCARY, umid="SGx2fA")