1 import os, re, weakref, struct, time
3 from foolscap.api import Referenceable
4 from twisted.application import service
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15 create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18 from allmydata.storage.expirer import LeaseCheckingCrawler
21 # storage/shares/incoming
22 # incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23 # be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24 # storage/shares/$START/$STORAGEINDEX
25 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
27 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
30 # $SHARENUM matches this regex:
31 NUM_RE=re.compile("^[0-9]+$")
35 class StorageServer(service.MultiService, Referenceable):
36 implements(RIStorageServer, IStatsProducer)
38 LeaseCheckerClass = LeaseCheckingCrawler
40 def __init__(self, storedir, nodeid, reserved_space=0,
41 discard_storage=False, readonly_storage=False,
43 expiration_enabled=False,
44 expiration_mode="age",
45 expiration_override_lease_duration=None,
46 expiration_cutoff_date=None,
47 expiration_sharetypes=("mutable", "immutable")):
48 service.MultiService.__init__(self)
49 assert isinstance(nodeid, str)
50 assert len(nodeid) == 20
51 self.my_nodeid = nodeid
52 self.storedir = storedir
53 sharedir = os.path.join(storedir, "shares")
54 fileutil.make_dirs(sharedir)
55 self.sharedir = sharedir
56 # we don't actually create the corruption-advisory dir until necessary
57 self.corruption_advisory_dir = os.path.join(storedir,
58 "corruption-advisories")
59 self.reserved_space = int(reserved_space)
60 self.no_storage = discard_storage
61 self.readonly_storage = readonly_storage
62 self.stats_provider = stats_provider
63 if self.stats_provider:
64 self.stats_provider.register_producer(self)
65 self.incomingdir = os.path.join(sharedir, 'incoming')
66 self._clean_incomplete()
67 fileutil.make_dirs(self.incomingdir)
68 self._active_writers = weakref.WeakKeyDictionary()
69 log.msg("StorageServer created", facility="tahoe.storage")
72 if self.get_available_space() is None:
73 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
74 umin="0wZ27w", level=log.UNUSUAL)
76 self.latencies = {"allocate": [], # immutable
81 "writev": [], # mutable
83 "add-lease": [], # both
87 self.add_bucket_counter()
89 statefile = os.path.join(self.storedir, "lease_checker.state")
90 historyfile = os.path.join(self.storedir, "lease_checker.history")
91 klass = self.LeaseCheckerClass
92 self.lease_checker = klass(self, statefile, historyfile,
93 expiration_enabled, expiration_mode,
94 expiration_override_lease_duration,
95 expiration_cutoff_date,
96 expiration_sharetypes)
97 self.lease_checker.setServiceParent(self)
100 return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
102 def add_bucket_counter(self):
103 statefile = os.path.join(self.storedir, "bucket_counter.state")
104 self.bucket_counter = BucketCountingCrawler(self, statefile)
105 self.bucket_counter.setServiceParent(self)
107 def count(self, name, delta=1):
108 if self.stats_provider:
109 self.stats_provider.count("storage_server." + name, delta)
111 def add_latency(self, category, latency):
112 a = self.latencies[category]
115 self.latencies[category] = a[-1000:]
117 def get_latencies(self):
118 """Return a dict, indexed by category, that contains a dict of
119 latency numbers for each category. If there are sufficient samples
120 for unambiguous interpretation, each dict will contain the
121 following keys: mean, 01_0_percentile, 10_0_percentile,
122 50_0_percentile (median), 90_0_percentile, 95_0_percentile,
123 99_0_percentile, 99_9_percentile. If there are insufficient
124 samples for a given percentile to be interpreted unambiguously
125 that percentile will be reported as None. If no samples have been
126 collected for the given category, then that category name will
127 not be present in the return value. """
128 # note that Amazon's Dynamo paper says they use 99.9% percentile.
130 for category in self.latencies:
131 if not self.latencies[category]:
134 samples = self.latencies[category][:]
136 stats["samplesize"] = count
139 stats["mean"] = sum(samples) / count
143 orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
144 (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
145 (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
146 (0.999, "99_9_percentile", 1000)]
148 for percentile, percentilestring, minnumtoobserve in orderstatlist:
149 if count >= minnumtoobserve:
150 stats[percentilestring] = samples[int(percentile*count)]
152 stats[percentilestring] = None
154 output[category] = stats
157 def log(self, *args, **kwargs):
158 if "facility" not in kwargs:
159 kwargs["facility"] = "tahoe.storage"
160 return log.msg(*args, **kwargs)
162 def _clean_incomplete(self):
163 fileutil.rm_dir(self.incomingdir)
166 # remember: RIStatsProvider requires that our return dict
167 # contains numeric values.
168 stats = { 'storage_server.allocated': self.allocated_size(), }
169 stats['storage_server.reserved_space'] = self.reserved_space
170 for category,ld in self.get_latencies().items():
171 for name,v in ld.items():
172 stats['storage_server.latencies.%s.%s' % (category, name)] = v
175 disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
176 writeable = disk['avail'] > 0
178 # spacetime predictors should use disk_avail / (d(disk_used)/dt)
179 stats['storage_server.disk_total'] = disk['total']
180 stats['storage_server.disk_used'] = disk['used']
181 stats['storage_server.disk_free_for_root'] = disk['free_for_root']
182 stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
183 stats['storage_server.disk_avail'] = disk['avail']
184 except AttributeError:
186 except EnvironmentError:
187 log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
190 if self.readonly_storage:
191 stats['storage_server.disk_avail'] = 0
194 stats['storage_server.accepting_immutable_shares'] = int(writeable)
195 s = self.bucket_counter.get_state()
196 bucket_count = s.get("last-complete-bucket-count")
198 stats['storage_server.total_bucket_count'] = bucket_count
201 def get_available_space(self):
202 """Returns available space for share storage in bytes, or None if no
203 API to get this information is available."""
205 if self.readonly_storage:
207 return fileutil.get_available_space(self.sharedir, self.reserved_space)
209 def allocated_size(self):
211 for bw in self._active_writers:
212 space += bw.allocated_size()
215 def remote_get_version(self):
216 remaining_space = self.get_available_space()
217 if remaining_space is None:
218 # We're on a platform that has no API to get disk stats.
219 remaining_space = 2**64
221 version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
222 { "maximum-immutable-share-size": remaining_space,
223 "tolerates-immutable-read-overrun": True,
224 "delete-mutable-shares-with-zero-length-writev": True,
226 "application-version": str(allmydata.__full_version__),
230 def remote_allocate_buckets(self, storage_index,
231 renew_secret, cancel_secret,
232 sharenums, allocated_size,
233 canary, owner_num=0):
234 # owner_num is not for clients to set, but rather it should be
235 # curried into the PersonalStorageServer instance that is dedicated
236 # to a particular owner.
238 self.count("allocate")
240 bucketwriters = {} # k: shnum, v: BucketWriter
241 si_dir = storage_index_to_dir(storage_index)
242 si_s = si_b2a(storage_index)
244 log.msg("storage: allocate_buckets %s" % si_s)
246 # in this implementation, the lease information (including secrets)
247 # goes into the share files themselves. It could also be put into a
248 # separate database. Note that the lease should not be added until
249 # the BucketWriter has been closed.
250 expire_time = time.time() + 31*24*60*60
251 lease_info = LeaseInfo(owner_num,
252 renew_secret, cancel_secret,
253 expire_time, self.my_nodeid)
255 max_space_per_bucket = allocated_size
257 remaining_space = self.get_available_space()
258 limited = remaining_space is not None
260 # this is a bit conservative, since some of this allocated_size()
261 # has already been written to disk, where it will show up in
262 # get_available_space.
263 remaining_space -= self.allocated_size()
264 # self.readonly_storage causes remaining_space <= 0
266 # fill alreadygot with all shares that we have, not just the ones
267 # they asked about: this will save them a lot of work. Add or update
268 # leases for all of them: if they want us to hold shares for this
269 # file, they'll want us to hold leases for this file.
270 for (shnum, fn) in self._get_bucket_shares(storage_index):
271 alreadygot.add(shnum)
273 sf.add_or_renew_lease(lease_info)
275 for shnum in sharenums:
276 incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
277 finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
278 if os.path.exists(finalhome):
279 # great! we already have it. easy.
281 elif os.path.exists(incominghome):
282 # Note that we don't create BucketWriters for shnums that
283 # have a partial share (in incoming/), so if a second upload
284 # occurs while the first is still in progress, the second
285 # uploader will use different storage servers.
287 elif (not limited) or (remaining_space >= max_space_per_bucket):
288 # ok! we need to create the new share file.
289 bw = BucketWriter(self, incominghome, finalhome,
290 max_space_per_bucket, lease_info, canary)
292 bw.throw_out_all_data = True
293 bucketwriters[shnum] = bw
294 self._active_writers[bw] = 1
296 remaining_space -= max_space_per_bucket
298 # bummer! not enough space to accept this bucket
302 fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
304 self.add_latency("allocate", time.time() - start)
305 return alreadygot, bucketwriters
307 def _iter_share_files(self, storage_index):
308 for shnum, filename in self._get_bucket_shares(storage_index):
309 f = open(filename, 'rb')
312 if header[:32] == MutableShareFile.MAGIC:
313 sf = MutableShareFile(filename, self)
314 # note: if the share has been migrated, the renew_lease()
315 # call will throw an exception, with information to help the
316 # client update the lease.
317 elif header[:4] == struct.pack(">L", 1):
318 sf = ShareFile(filename)
320 continue # non-sharefile
323 def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
326 self.count("add-lease")
327 new_expire_time = time.time() + 31*24*60*60
328 lease_info = LeaseInfo(owner_num,
329 renew_secret, cancel_secret,
330 new_expire_time, self.my_nodeid)
331 for sf in self._iter_share_files(storage_index):
332 sf.add_or_renew_lease(lease_info)
333 self.add_latency("add-lease", time.time() - start)
336 def remote_renew_lease(self, storage_index, renew_secret):
339 new_expire_time = time.time() + 31*24*60*60
340 found_buckets = False
341 for sf in self._iter_share_files(storage_index):
343 sf.renew_lease(renew_secret, new_expire_time)
344 self.add_latency("renew", time.time() - start)
345 if not found_buckets:
346 raise IndexError("no such lease to renew")
348 def remote_cancel_lease(self, storage_index, cancel_secret):
352 total_space_freed = 0
353 found_buckets = False
354 for sf in self._iter_share_files(storage_index):
355 # note: if we can't find a lease on one share, we won't bother
356 # looking in the others. Unless something broke internally
357 # (perhaps we ran out of disk space while adding a lease), the
358 # leases on all shares will be identical.
360 # this raises IndexError if the lease wasn't present XXXX
361 total_space_freed += sf.cancel_lease(cancel_secret)
364 storagedir = os.path.join(self.sharedir,
365 storage_index_to_dir(storage_index))
366 if not os.listdir(storagedir):
369 if self.stats_provider:
370 self.stats_provider.count('storage_server.bytes_freed',
372 self.add_latency("cancel", time.time() - start)
373 if not found_buckets:
374 raise IndexError("no such storage index")
376 def bucket_writer_closed(self, bw, consumed_size):
377 if self.stats_provider:
378 self.stats_provider.count('storage_server.bytes_added', consumed_size)
379 del self._active_writers[bw]
381 def _get_bucket_shares(self, storage_index):
382 """Return a list of (shnum, pathname) tuples for files that hold
383 shares for this storage_index. In each tuple, 'shnum' will always be
384 the integer form of the last component of 'pathname'."""
385 storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
387 for f in os.listdir(storagedir):
389 filename = os.path.join(storagedir, f)
390 yield (int(f), filename)
392 # Commonly caused by there being no buckets at all.
395 def remote_get_buckets(self, storage_index):
398 si_s = si_b2a(storage_index)
399 log.msg("storage: get_buckets %s" % si_s)
400 bucketreaders = {} # k: sharenum, v: BucketReader
401 for shnum, filename in self._get_bucket_shares(storage_index):
402 bucketreaders[shnum] = BucketReader(self, filename,
403 storage_index, shnum)
404 self.add_latency("get", time.time() - start)
407 def get_leases(self, storage_index):
408 """Provide an iterator that yields all of the leases attached to this
409 bucket. Each lease is returned as a LeaseInfo instance.
411 This method is not for client use.
414 # since all shares get the same lease data, we just grab the leases
415 # from the first share
417 shnum, filename = self._get_bucket_shares(storage_index).next()
418 sf = ShareFile(filename)
419 return sf.get_leases()
420 except StopIteration:
423 def remote_slot_testv_and_readv_and_writev(self, storage_index,
425 test_and_write_vectors,
429 si_s = si_b2a(storage_index)
430 log.msg("storage: slot_writev %s" % si_s)
431 si_dir = storage_index_to_dir(storage_index)
432 (write_enabler, renew_secret, cancel_secret) = secrets
433 # shares exist if there is a file for them
434 bucketdir = os.path.join(self.sharedir, si_dir)
436 if os.path.isdir(bucketdir):
437 for sharenum_s in os.listdir(bucketdir):
439 sharenum = int(sharenum_s)
442 filename = os.path.join(bucketdir, sharenum_s)
443 msf = MutableShareFile(filename, self)
444 msf.check_write_enabler(write_enabler, si_s)
445 shares[sharenum] = msf
446 # write_enabler is good for all existing shares.
448 # Now evaluate test vectors.
450 for sharenum in test_and_write_vectors:
451 (testv, datav, new_length) = test_and_write_vectors[sharenum]
452 if sharenum in shares:
453 if not shares[sharenum].check_testv(testv):
454 self.log("testv failed: [%d]: %r" % (sharenum, testv))
455 testv_is_good = False
458 # compare the vectors against an empty share, in which all
459 # reads return empty strings.
460 if not EmptyShare().check_testv(testv):
461 self.log("testv failed (empty): [%d] %r" % (sharenum,
463 testv_is_good = False
466 # now gather the read vectors, before we do any writes
468 for sharenum, share in shares.items():
469 read_data[sharenum] = share.readv(read_vector)
472 expire_time = time.time() + 31*24*60*60 # one month
473 lease_info = LeaseInfo(ownerid,
474 renew_secret, cancel_secret,
475 expire_time, self.my_nodeid)
478 # now apply the write vectors
479 for sharenum in test_and_write_vectors:
480 (testv, datav, new_length) = test_and_write_vectors[sharenum]
482 if sharenum in shares:
483 shares[sharenum].unlink()
485 if sharenum not in shares:
486 # allocate a new share
487 allocated_size = 2000 # arbitrary, really
488 share = self._allocate_slot_share(bucketdir, secrets,
492 shares[sharenum] = share
493 shares[sharenum].writev(datav, new_length)
494 # and update the lease
495 shares[sharenum].add_or_renew_lease(lease_info)
498 # delete empty bucket directories
499 if not os.listdir(bucketdir):
504 self.add_latency("writev", time.time() - start)
505 return (testv_is_good, read_data)
507 def _allocate_slot_share(self, bucketdir, secrets, sharenum,
508 allocated_size, owner_num=0):
509 (write_enabler, renew_secret, cancel_secret) = secrets
510 my_nodeid = self.my_nodeid
511 fileutil.make_dirs(bucketdir)
512 filename = os.path.join(bucketdir, "%d" % sharenum)
513 share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
517 def remote_slot_readv(self, storage_index, shares, readv):
520 si_s = si_b2a(storage_index)
521 lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
522 facility="tahoe.storage", level=log.OPERATIONAL)
523 si_dir = storage_index_to_dir(storage_index)
524 # shares exist if there is a file for them
525 bucketdir = os.path.join(self.sharedir, si_dir)
526 if not os.path.isdir(bucketdir):
527 self.add_latency("readv", time.time() - start)
530 for sharenum_s in os.listdir(bucketdir):
532 sharenum = int(sharenum_s)
535 if sharenum in shares or not shares:
536 filename = os.path.join(bucketdir, sharenum_s)
537 msf = MutableShareFile(filename, self)
538 datavs[sharenum] = msf.readv(readv)
539 log.msg("returning shares %s" % (datavs.keys(),),
540 facility="tahoe.storage", level=log.NOISY, parent=lp)
541 self.add_latency("readv", time.time() - start)
544 def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
546 fileutil.make_dirs(self.corruption_advisory_dir)
547 now = time_format.iso_utc(sep="T")
548 si_s = si_b2a(storage_index)
549 # windows can't handle colons in the filename
550 fn = os.path.join(self.corruption_advisory_dir,
551 "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
553 f.write("report: Share Corruption\n")
554 f.write("type: %s\n" % share_type)
555 f.write("storage_index: %s\n" % si_s)
556 f.write("share_number: %d\n" % shnum)
561 log.msg(format=("client claims corruption in (%(share_type)s) " +
562 "%(si)s-%(shnum)d: %(reason)s"),
563 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
564 level=log.SCARY, umid="SGx2fA")