1 import os, re, weakref, struct, time
3 from foolscap.api import Referenceable
4 from twisted.application import service
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15 create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18 from allmydata.storage.expirer import LeaseCheckingCrawler
21 # storage/shares/incoming
22 # incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23 # be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24 # storage/shares/$START/$STORAGEINDEX
25 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
27 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
30 # $SHARENUM matches this regex:
31 NUM_RE=re.compile("^[0-9]+$")
35 class StorageServer(service.MultiService, Referenceable):
36 implements(RIStorageServer, IStatsProducer)
38 LeaseCheckerClass = LeaseCheckingCrawler
40 def __init__(self, storedir, nodeid, reserved_space=0,
41 discard_storage=False, readonly_storage=False,
43 expiration_enabled=False,
44 expiration_mode="age",
45 expiration_override_lease_duration=None,
46 expiration_cutoff_date=None,
47 expiration_sharetypes=("mutable", "immutable")):
48 service.MultiService.__init__(self)
49 assert isinstance(nodeid, str)
50 assert len(nodeid) == 20
51 self.my_nodeid = nodeid
52 self.storedir = storedir
53 sharedir = os.path.join(storedir, "shares")
54 fileutil.make_dirs(sharedir)
55 self.sharedir = sharedir
56 # we don't actually create the corruption-advisory dir until necessary
57 self.corruption_advisory_dir = os.path.join(storedir,
58 "corruption-advisories")
59 self.reserved_space = int(reserved_space)
60 self.no_storage = discard_storage
61 self.readonly_storage = readonly_storage
62 self.stats_provider = stats_provider
63 if self.stats_provider:
64 self.stats_provider.register_producer(self)
65 self.incomingdir = os.path.join(sharedir, 'incoming')
66 self._clean_incomplete()
67 fileutil.make_dirs(self.incomingdir)
68 self._active_writers = weakref.WeakKeyDictionary()
69 log.msg("StorageServer created", facility="tahoe.storage")
72 if self.get_available_space() is None:
73 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
74 umin="0wZ27w", level=log.UNUSUAL)
76 self.latencies = {"allocate": [], # immutable
81 "writev": [], # mutable
83 "add-lease": [], # both
87 self.add_bucket_counter()
89 statefile = os.path.join(self.storedir, "lease_checker.state")
90 historyfile = os.path.join(self.storedir, "lease_checker.history")
91 klass = self.LeaseCheckerClass
92 self.lease_checker = klass(self, statefile, historyfile,
93 expiration_enabled, expiration_mode,
94 expiration_override_lease_duration,
95 expiration_cutoff_date,
96 expiration_sharetypes)
97 self.lease_checker.setServiceParent(self)
100 return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
102 def add_bucket_counter(self):
103 statefile = os.path.join(self.storedir, "bucket_counter.state")
104 self.bucket_counter = BucketCountingCrawler(self, statefile)
105 self.bucket_counter.setServiceParent(self)
107 def count(self, name, delta=1):
108 if self.stats_provider:
109 self.stats_provider.count("storage_server." + name, delta)
111 def add_latency(self, category, latency):
112 a = self.latencies[category]
115 self.latencies[category] = a[-1000:]
117 def get_latencies(self):
118 """Return a dict, indexed by category, that contains a dict of
119 latency numbers for each category. Each dict will contain the
120 following keys: mean, 01_0_percentile, 10_0_percentile,
121 50_0_percentile (median), 90_0_percentile, 95_0_percentile,
122 99_0_percentile, 99_9_percentile. If no samples have been collected
123 for the given category, then that category name will not be present
124 in the return value."""
125 # note that Amazon's Dynamo paper says they use 99.9% percentile.
127 for category in self.latencies:
128 if not self.latencies[category]:
131 samples = self.latencies[category][:]
134 stats["mean"] = sum(samples) / count
135 stats["01_0_percentile"] = samples[int(0.01 * count)]
136 stats["10_0_percentile"] = samples[int(0.1 * count)]
137 stats["50_0_percentile"] = samples[int(0.5 * count)]
138 stats["90_0_percentile"] = samples[int(0.9 * count)]
139 stats["95_0_percentile"] = samples[int(0.95 * count)]
140 stats["99_0_percentile"] = samples[int(0.99 * count)]
141 stats["99_9_percentile"] = samples[int(0.999 * count)]
142 output[category] = stats
145 def log(self, *args, **kwargs):
146 if "facility" not in kwargs:
147 kwargs["facility"] = "tahoe.storage"
148 return log.msg(*args, **kwargs)
150 def _clean_incomplete(self):
151 fileutil.rm_dir(self.incomingdir)
154 # remember: RIStatsProvider requires that our return dict
155 # contains numeric values.
156 stats = { 'storage_server.allocated': self.allocated_size(), }
157 stats['storage_server.reserved_space'] = self.reserved_space
158 for category,ld in self.get_latencies().items():
159 for name,v in ld.items():
160 stats['storage_server.latencies.%s.%s' % (category, name)] = v
163 disk = fileutil.get_disk_stats(self.storedir, self.reserved_space)
164 writeable = disk['avail'] > 0
166 # spacetime predictors should use disk_avail / (d(disk_used)/dt)
167 stats['storage_server.disk_total'] = disk['total']
168 stats['storage_server.disk_used'] = disk['used']
169 stats['storage_server.disk_free_for_root'] = disk['free_for_root']
170 stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
171 stats['storage_server.disk_avail'] = disk['avail']
172 except AttributeError:
174 except EnvironmentError:
175 log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
178 if self.readonly_storage:
179 stats['storage_server.disk_avail'] = 0
182 stats['storage_server.accepting_immutable_shares'] = int(writeable)
183 s = self.bucket_counter.get_state()
184 bucket_count = s.get("last-complete-bucket-count")
186 stats['storage_server.total_bucket_count'] = bucket_count
189 def get_available_space(self):
190 """Returns available space for share storage in bytes, or None if no
191 API to get this information is available."""
193 if self.readonly_storage:
195 return fileutil.get_available_space(self.storedir, self.reserved_space)
197 def allocated_size(self):
199 for bw in self._active_writers:
200 space += bw.allocated_size()
203 def remote_get_version(self):
204 remaining_space = self.get_available_space()
205 if remaining_space is None:
206 # We're on a platform that has no API to get disk stats.
207 remaining_space = 2**64
209 version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
210 { "maximum-immutable-share-size": remaining_space,
211 "tolerates-immutable-read-overrun": True,
212 "delete-mutable-shares-with-zero-length-writev": True,
214 "application-version": str(allmydata.__full_version__),
218 def remote_allocate_buckets(self, storage_index,
219 renew_secret, cancel_secret,
220 sharenums, allocated_size,
221 canary, owner_num=0):
222 # owner_num is not for clients to set, but rather it should be
223 # curried into the PersonalStorageServer instance that is dedicated
224 # to a particular owner.
226 self.count("allocate")
228 bucketwriters = {} # k: shnum, v: BucketWriter
229 si_dir = storage_index_to_dir(storage_index)
230 si_s = si_b2a(storage_index)
232 log.msg("storage: allocate_buckets %s" % si_s)
234 # in this implementation, the lease information (including secrets)
235 # goes into the share files themselves. It could also be put into a
236 # separate database. Note that the lease should not be added until
237 # the BucketWriter has been closed.
238 expire_time = time.time() + 31*24*60*60
239 lease_info = LeaseInfo(owner_num,
240 renew_secret, cancel_secret,
241 expire_time, self.my_nodeid)
243 max_space_per_bucket = allocated_size
245 remaining_space = self.get_available_space()
246 limited = remaining_space is not None
248 # this is a bit conservative, since some of this allocated_size()
249 # has already been written to disk, where it will show up in
250 # get_available_space.
251 remaining_space -= self.allocated_size()
252 # self.readonly_storage causes remaining_space <= 0
254 # fill alreadygot with all shares that we have, not just the ones
255 # they asked about: this will save them a lot of work. Add or update
256 # leases for all of them: if they want us to hold shares for this
257 # file, they'll want us to hold leases for this file.
258 for (shnum, fn) in self._get_bucket_shares(storage_index):
259 alreadygot.add(shnum)
261 sf.add_or_renew_lease(lease_info)
263 for shnum in sharenums:
264 incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
265 finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
266 if os.path.exists(finalhome):
267 # great! we already have it. easy.
269 elif os.path.exists(incominghome):
270 # Note that we don't create BucketWriters for shnums that
271 # have a partial share (in incoming/), so if a second upload
272 # occurs while the first is still in progress, the second
273 # uploader will use different storage servers.
275 elif (not limited) or (remaining_space >= max_space_per_bucket):
276 # ok! we need to create the new share file.
277 bw = BucketWriter(self, incominghome, finalhome,
278 max_space_per_bucket, lease_info, canary)
280 bw.throw_out_all_data = True
281 bucketwriters[shnum] = bw
282 self._active_writers[bw] = 1
284 remaining_space -= max_space_per_bucket
286 # bummer! not enough space to accept this bucket
290 fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
292 self.add_latency("allocate", time.time() - start)
293 return alreadygot, bucketwriters
295 def _iter_share_files(self, storage_index):
296 for shnum, filename in self._get_bucket_shares(storage_index):
297 f = open(filename, 'rb')
300 if header[:32] == MutableShareFile.MAGIC:
301 sf = MutableShareFile(filename, self)
302 # note: if the share has been migrated, the renew_lease()
303 # call will throw an exception, with information to help the
304 # client update the lease.
305 elif header[:4] == struct.pack(">L", 1):
306 sf = ShareFile(filename)
308 continue # non-sharefile
311 def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
314 self.count("add-lease")
315 new_expire_time = time.time() + 31*24*60*60
316 lease_info = LeaseInfo(owner_num,
317 renew_secret, cancel_secret,
318 new_expire_time, self.my_nodeid)
319 for sf in self._iter_share_files(storage_index):
320 sf.add_or_renew_lease(lease_info)
321 self.add_latency("add-lease", time.time() - start)
324 def remote_renew_lease(self, storage_index, renew_secret):
327 new_expire_time = time.time() + 31*24*60*60
328 found_buckets = False
329 for sf in self._iter_share_files(storage_index):
331 sf.renew_lease(renew_secret, new_expire_time)
332 self.add_latency("renew", time.time() - start)
333 if not found_buckets:
334 raise IndexError("no such lease to renew")
336 def remote_cancel_lease(self, storage_index, cancel_secret):
340 total_space_freed = 0
341 found_buckets = False
342 for sf in self._iter_share_files(storage_index):
343 # note: if we can't find a lease on one share, we won't bother
344 # looking in the others. Unless something broke internally
345 # (perhaps we ran out of disk space while adding a lease), the
346 # leases on all shares will be identical.
348 # this raises IndexError if the lease wasn't present XXXX
349 total_space_freed += sf.cancel_lease(cancel_secret)
352 storagedir = os.path.join(self.sharedir,
353 storage_index_to_dir(storage_index))
354 if not os.listdir(storagedir):
357 if self.stats_provider:
358 self.stats_provider.count('storage_server.bytes_freed',
360 self.add_latency("cancel", time.time() - start)
361 if not found_buckets:
362 raise IndexError("no such storage index")
364 def bucket_writer_closed(self, bw, consumed_size):
365 if self.stats_provider:
366 self.stats_provider.count('storage_server.bytes_added', consumed_size)
367 del self._active_writers[bw]
369 def _get_bucket_shares(self, storage_index):
370 """Return a list of (shnum, pathname) tuples for files that hold
371 shares for this storage_index. In each tuple, 'shnum' will always be
372 the integer form of the last component of 'pathname'."""
373 storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
375 for f in os.listdir(storagedir):
377 filename = os.path.join(storagedir, f)
378 yield (int(f), filename)
380 # Commonly caused by there being no buckets at all.
383 def remote_get_buckets(self, storage_index):
386 si_s = si_b2a(storage_index)
387 log.msg("storage: get_buckets %s" % si_s)
388 bucketreaders = {} # k: sharenum, v: BucketReader
389 for shnum, filename in self._get_bucket_shares(storage_index):
390 bucketreaders[shnum] = BucketReader(self, filename,
391 storage_index, shnum)
392 self.add_latency("get", time.time() - start)
395 def get_leases(self, storage_index):
396 """Provide an iterator that yields all of the leases attached to this
397 bucket. Each lease is returned as a LeaseInfo instance.
399 This method is not for client use.
402 # since all shares get the same lease data, we just grab the leases
403 # from the first share
405 shnum, filename = self._get_bucket_shares(storage_index).next()
406 sf = ShareFile(filename)
407 return sf.get_leases()
408 except StopIteration:
411 def remote_slot_testv_and_readv_and_writev(self, storage_index,
413 test_and_write_vectors,
417 si_s = si_b2a(storage_index)
418 log.msg("storage: slot_writev %s" % si_s)
419 si_dir = storage_index_to_dir(storage_index)
420 (write_enabler, renew_secret, cancel_secret) = secrets
421 # shares exist if there is a file for them
422 bucketdir = os.path.join(self.sharedir, si_dir)
424 if os.path.isdir(bucketdir):
425 for sharenum_s in os.listdir(bucketdir):
427 sharenum = int(sharenum_s)
430 filename = os.path.join(bucketdir, sharenum_s)
431 msf = MutableShareFile(filename, self)
432 msf.check_write_enabler(write_enabler, si_s)
433 shares[sharenum] = msf
434 # write_enabler is good for all existing shares.
436 # Now evaluate test vectors.
438 for sharenum in test_and_write_vectors:
439 (testv, datav, new_length) = test_and_write_vectors[sharenum]
440 if sharenum in shares:
441 if not shares[sharenum].check_testv(testv):
442 self.log("testv failed: [%d]: %r" % (sharenum, testv))
443 testv_is_good = False
446 # compare the vectors against an empty share, in which all
447 # reads return empty strings.
448 if not EmptyShare().check_testv(testv):
449 self.log("testv failed (empty): [%d] %r" % (sharenum,
451 testv_is_good = False
454 # now gather the read vectors, before we do any writes
456 for sharenum, share in shares.items():
457 read_data[sharenum] = share.readv(read_vector)
460 expire_time = time.time() + 31*24*60*60 # one month
461 lease_info = LeaseInfo(ownerid,
462 renew_secret, cancel_secret,
463 expire_time, self.my_nodeid)
466 # now apply the write vectors
467 for sharenum in test_and_write_vectors:
468 (testv, datav, new_length) = test_and_write_vectors[sharenum]
470 if sharenum in shares:
471 shares[sharenum].unlink()
473 if sharenum not in shares:
474 # allocate a new share
475 allocated_size = 2000 # arbitrary, really
476 share = self._allocate_slot_share(bucketdir, secrets,
480 shares[sharenum] = share
481 shares[sharenum].writev(datav, new_length)
482 # and update the lease
483 shares[sharenum].add_or_renew_lease(lease_info)
486 # delete empty bucket directories
487 if not os.listdir(bucketdir):
492 self.add_latency("writev", time.time() - start)
493 return (testv_is_good, read_data)
495 def _allocate_slot_share(self, bucketdir, secrets, sharenum,
496 allocated_size, owner_num=0):
497 (write_enabler, renew_secret, cancel_secret) = secrets
498 my_nodeid = self.my_nodeid
499 fileutil.make_dirs(bucketdir)
500 filename = os.path.join(bucketdir, "%d" % sharenum)
501 share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
505 def remote_slot_readv(self, storage_index, shares, readv):
508 si_s = si_b2a(storage_index)
509 lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
510 facility="tahoe.storage", level=log.OPERATIONAL)
511 si_dir = storage_index_to_dir(storage_index)
512 # shares exist if there is a file for them
513 bucketdir = os.path.join(self.sharedir, si_dir)
514 if not os.path.isdir(bucketdir):
515 self.add_latency("readv", time.time() - start)
518 for sharenum_s in os.listdir(bucketdir):
520 sharenum = int(sharenum_s)
523 if sharenum in shares or not shares:
524 filename = os.path.join(bucketdir, sharenum_s)
525 msf = MutableShareFile(filename, self)
526 datavs[sharenum] = msf.readv(readv)
527 log.msg("returning shares %s" % (datavs.keys(),),
528 facility="tahoe.storage", level=log.NOISY, parent=lp)
529 self.add_latency("readv", time.time() - start)
532 def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
534 fileutil.make_dirs(self.corruption_advisory_dir)
535 now = time_format.iso_utc(sep="T")
536 si_s = si_b2a(storage_index)
537 # windows can't handle colons in the filename
538 fn = os.path.join(self.corruption_advisory_dir,
539 "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
541 f.write("report: Share Corruption\n")
542 f.write("type: %s\n" % share_type)
543 f.write("storage_index: %s\n" % si_s)
544 f.write("share_number: %d\n" % shnum)
549 log.msg(format=("client claims corruption in (%(share_type)s) " +
550 "%(si)s-%(shnum)d: %(reason)s"),
551 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
552 level=log.SCARY, umid="SGx2fA")