]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/server.py
add "Available" column to welcome page (#648)
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / server.py
1 import os, re, weakref, struct, time
2
3 from foolscap.api import Referenceable
4 from twisted.application import service
5
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
10
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15      create_mutable_sharefile
16 from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
17 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20
21 # storage/
22 # storage/shares/incoming
23 #   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
24 #   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
25 # storage/shares/$START/$STORAGEINDEX
26 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
27
28 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
29 # base-32 chars).
30
31 # $SHARENUM matches this regex:
32 NUM_RE=re.compile("^[0-9]+$")
33
34
35
36 class StorageServer(service.MultiService, Referenceable):
37     implements(RIStorageServer, IStatsProducer)
38     name = 'storage'
39     LeaseCheckerClass = LeaseCheckingCrawler
40
41     def __init__(self, storedir, nodeid, reserved_space=0,
42                  discard_storage=False, readonly_storage=False,
43                  stats_provider=None,
44                  expiration_enabled=False,
45                  expiration_mode="age",
46                  expiration_override_lease_duration=None,
47                  expiration_cutoff_date=None,
48                  expiration_sharetypes=("mutable", "immutable")):
49         service.MultiService.__init__(self)
50         assert isinstance(nodeid, str)
51         assert len(nodeid) == 20
52         self.my_nodeid = nodeid
53         self.storedir = storedir
54         sharedir = os.path.join(storedir, "shares")
55         fileutil.make_dirs(sharedir)
56         self.sharedir = sharedir
57         # we don't actually create the corruption-advisory dir until necessary
58         self.corruption_advisory_dir = os.path.join(storedir,
59                                                     "corruption-advisories")
60         self.reserved_space = int(reserved_space)
61         self.no_storage = discard_storage
62         self.readonly_storage = readonly_storage
63         self.stats_provider = stats_provider
64         if self.stats_provider:
65             self.stats_provider.register_producer(self)
66         self.incomingdir = os.path.join(sharedir, 'incoming')
67         self._clean_incomplete()
68         fileutil.make_dirs(self.incomingdir)
69         self._active_writers = weakref.WeakKeyDictionary()
70         log.msg("StorageServer created", facility="tahoe.storage")
71
72         if reserved_space:
73             if self.get_available_space() is None:
74                 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
75                         umin="0wZ27w", level=log.UNUSUAL)
76
77         self.latencies = {"allocate": [], # immutable
78                           "write": [],
79                           "close": [],
80                           "read": [],
81                           "get": [],
82                           "writev": [], # mutable
83                           "readv": [],
84                           "add-lease": [], # both
85                           "renew": [],
86                           "cancel": [],
87                           }
88         self.add_bucket_counter()
89
90         statefile = os.path.join(self.storedir, "lease_checker.state")
91         historyfile = os.path.join(self.storedir, "lease_checker.history")
92         klass = self.LeaseCheckerClass
93         self.lease_checker = klass(self, statefile, historyfile,
94                                    expiration_enabled, expiration_mode,
95                                    expiration_override_lease_duration,
96                                    expiration_cutoff_date,
97                                    expiration_sharetypes)
98         self.lease_checker.setServiceParent(self)
99
100     def __repr__(self):
101         return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
102
103     def have_shares(self):
104         # quick test to decide if we need to commit to an implicit
105         # permutation-seed or if we should use a new one
106         return bool(set(os.listdir(self.sharedir)) - set(["incoming"]))
107
108     def add_bucket_counter(self):
109         statefile = os.path.join(self.storedir, "bucket_counter.state")
110         self.bucket_counter = BucketCountingCrawler(self, statefile)
111         self.bucket_counter.setServiceParent(self)
112
113     def count(self, name, delta=1):
114         if self.stats_provider:
115             self.stats_provider.count("storage_server." + name, delta)
116
117     def add_latency(self, category, latency):
118         a = self.latencies[category]
119         a.append(latency)
120         if len(a) > 1000:
121             self.latencies[category] = a[-1000:]
122
123     def get_latencies(self):
124         """Return a dict, indexed by category, that contains a dict of
125         latency numbers for each category. If there are sufficient samples
126         for unambiguous interpretation, each dict will contain the
127         following keys: mean, 01_0_percentile, 10_0_percentile,
128         50_0_percentile (median), 90_0_percentile, 95_0_percentile,
129         99_0_percentile, 99_9_percentile.  If there are insufficient
130         samples for a given percentile to be interpreted unambiguously
131         that percentile will be reported as None. If no samples have been
132         collected for the given category, then that category name will
133         not be present in the return value. """
134         # note that Amazon's Dynamo paper says they use 99.9% percentile.
135         output = {}
136         for category in self.latencies:
137             if not self.latencies[category]:
138                 continue
139             stats = {}
140             samples = self.latencies[category][:]
141             count = len(samples)
142             stats["samplesize"] = count
143             samples.sort()
144             if count > 1:
145                 stats["mean"] = sum(samples) / count
146             else:
147                 stats["mean"] = None
148
149             orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
150                              (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
151                              (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
152                              (0.999, "99_9_percentile", 1000)]
153
154             for percentile, percentilestring, minnumtoobserve in orderstatlist:
155                 if count >= minnumtoobserve:
156                     stats[percentilestring] = samples[int(percentile*count)]
157                 else:
158                     stats[percentilestring] = None
159
160             output[category] = stats
161         return output
162
163     def log(self, *args, **kwargs):
164         if "facility" not in kwargs:
165             kwargs["facility"] = "tahoe.storage"
166         return log.msg(*args, **kwargs)
167
168     def _clean_incomplete(self):
169         fileutil.rm_dir(self.incomingdir)
170
171     def get_stats(self):
172         # remember: RIStatsProvider requires that our return dict
173         # contains numeric values.
174         stats = { 'storage_server.allocated': self.allocated_size(), }
175         stats['storage_server.reserved_space'] = self.reserved_space
176         for category,ld in self.get_latencies().items():
177             for name,v in ld.items():
178                 stats['storage_server.latencies.%s.%s' % (category, name)] = v
179
180         try:
181             disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
182             writeable = disk['avail'] > 0
183
184             # spacetime predictors should use disk_avail / (d(disk_used)/dt)
185             stats['storage_server.disk_total'] = disk['total']
186             stats['storage_server.disk_used'] = disk['used']
187             stats['storage_server.disk_free_for_root'] = disk['free_for_root']
188             stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
189             stats['storage_server.disk_avail'] = disk['avail']
190         except AttributeError:
191             writeable = True
192         except EnvironmentError:
193             log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
194             writeable = False
195
196         if self.readonly_storage:
197             stats['storage_server.disk_avail'] = 0
198             writeable = False
199
200         stats['storage_server.accepting_immutable_shares'] = int(writeable)
201         s = self.bucket_counter.get_state()
202         bucket_count = s.get("last-complete-bucket-count")
203         if bucket_count:
204             stats['storage_server.total_bucket_count'] = bucket_count
205         return stats
206
207     def get_available_space(self):
208         """Returns available space for share storage in bytes, or None if no
209         API to get this information is available."""
210
211         if self.readonly_storage:
212             return 0
213         return fileutil.get_available_space(self.sharedir, self.reserved_space)
214
215     def allocated_size(self):
216         space = 0
217         for bw in self._active_writers:
218             space += bw.allocated_size()
219         return space
220
221     def remote_get_version(self):
222         remaining_space = self.get_available_space()
223         if remaining_space is None:
224             # We're on a platform that has no API to get disk stats.
225             remaining_space = 2**64
226
227         version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
228                     { "maximum-immutable-share-size": remaining_space,
229                       "maximum-mutable-share-size": MAX_MUTABLE_SHARE_SIZE,
230                       "available-space": remaining_space,
231                       "tolerates-immutable-read-overrun": True,
232                       "delete-mutable-shares-with-zero-length-writev": True,
233                       "fills-holes-with-zero-bytes": True,
234                       "prevents-read-past-end-of-share-data": True,
235                       },
236                     "application-version": str(allmydata.__full_version__),
237                     }
238         return version
239
240     def remote_allocate_buckets(self, storage_index,
241                                 renew_secret, cancel_secret,
242                                 sharenums, allocated_size,
243                                 canary, owner_num=0):
244         # owner_num is not for clients to set, but rather it should be
245         # curried into the PersonalStorageServer instance that is dedicated
246         # to a particular owner.
247         start = time.time()
248         self.count("allocate")
249         alreadygot = set()
250         bucketwriters = {} # k: shnum, v: BucketWriter
251         si_dir = storage_index_to_dir(storage_index)
252         si_s = si_b2a(storage_index)
253
254         log.msg("storage: allocate_buckets %s" % si_s)
255
256         # in this implementation, the lease information (including secrets)
257         # goes into the share files themselves. It could also be put into a
258         # separate database. Note that the lease should not be added until
259         # the BucketWriter has been closed.
260         expire_time = time.time() + 31*24*60*60
261         lease_info = LeaseInfo(owner_num,
262                                renew_secret, cancel_secret,
263                                expire_time, self.my_nodeid)
264
265         max_space_per_bucket = allocated_size
266
267         remaining_space = self.get_available_space()
268         limited = remaining_space is not None
269         if limited:
270             # this is a bit conservative, since some of this allocated_size()
271             # has already been written to disk, where it will show up in
272             # get_available_space.
273             remaining_space -= self.allocated_size()
274         # self.readonly_storage causes remaining_space <= 0
275
276         # fill alreadygot with all shares that we have, not just the ones
277         # they asked about: this will save them a lot of work. Add or update
278         # leases for all of them: if they want us to hold shares for this
279         # file, they'll want us to hold leases for this file.
280         for (shnum, fn) in self._get_bucket_shares(storage_index):
281             alreadygot.add(shnum)
282             sf = ShareFile(fn)
283             sf.add_or_renew_lease(lease_info)
284
285         for shnum in sharenums:
286             incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
287             finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
288             if os.path.exists(finalhome):
289                 # great! we already have it. easy.
290                 pass
291             elif os.path.exists(incominghome):
292                 # Note that we don't create BucketWriters for shnums that
293                 # have a partial share (in incoming/), so if a second upload
294                 # occurs while the first is still in progress, the second
295                 # uploader will use different storage servers.
296                 pass
297             elif (not limited) or (remaining_space >= max_space_per_bucket):
298                 # ok! we need to create the new share file.
299                 bw = BucketWriter(self, incominghome, finalhome,
300                                   max_space_per_bucket, lease_info, canary)
301                 if self.no_storage:
302                     bw.throw_out_all_data = True
303                 bucketwriters[shnum] = bw
304                 self._active_writers[bw] = 1
305                 if limited:
306                     remaining_space -= max_space_per_bucket
307             else:
308                 # bummer! not enough space to accept this bucket
309                 pass
310
311         if bucketwriters:
312             fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
313
314         self.add_latency("allocate", time.time() - start)
315         return alreadygot, bucketwriters
316
317     def _iter_share_files(self, storage_index):
318         for shnum, filename in self._get_bucket_shares(storage_index):
319             f = open(filename, 'rb')
320             header = f.read(32)
321             f.close()
322             if header[:32] == MutableShareFile.MAGIC:
323                 sf = MutableShareFile(filename, self)
324                 # note: if the share has been migrated, the renew_lease()
325                 # call will throw an exception, with information to help the
326                 # client update the lease.
327             elif header[:4] == struct.pack(">L", 1):
328                 sf = ShareFile(filename)
329             else:
330                 continue # non-sharefile
331             yield sf
332
333     def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
334                          owner_num=1):
335         start = time.time()
336         self.count("add-lease")
337         new_expire_time = time.time() + 31*24*60*60
338         lease_info = LeaseInfo(owner_num,
339                                renew_secret, cancel_secret,
340                                new_expire_time, self.my_nodeid)
341         for sf in self._iter_share_files(storage_index):
342             sf.add_or_renew_lease(lease_info)
343         self.add_latency("add-lease", time.time() - start)
344         return None
345
346     def remote_renew_lease(self, storage_index, renew_secret):
347         start = time.time()
348         self.count("renew")
349         new_expire_time = time.time() + 31*24*60*60
350         found_buckets = False
351         for sf in self._iter_share_files(storage_index):
352             found_buckets = True
353             sf.renew_lease(renew_secret, new_expire_time)
354         self.add_latency("renew", time.time() - start)
355         if not found_buckets:
356             raise IndexError("no such lease to renew")
357
358     def bucket_writer_closed(self, bw, consumed_size):
359         if self.stats_provider:
360             self.stats_provider.count('storage_server.bytes_added', consumed_size)
361         del self._active_writers[bw]
362
363     def _get_bucket_shares(self, storage_index):
364         """Return a list of (shnum, pathname) tuples for files that hold
365         shares for this storage_index. In each tuple, 'shnum' will always be
366         the integer form of the last component of 'pathname'."""
367         storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
368         try:
369             for f in os.listdir(storagedir):
370                 if NUM_RE.match(f):
371                     filename = os.path.join(storagedir, f)
372                     yield (int(f), filename)
373         except OSError:
374             # Commonly caused by there being no buckets at all.
375             pass
376
377     def remote_get_buckets(self, storage_index):
378         start = time.time()
379         self.count("get")
380         si_s = si_b2a(storage_index)
381         log.msg("storage: get_buckets %s" % si_s)
382         bucketreaders = {} # k: sharenum, v: BucketReader
383         for shnum, filename in self._get_bucket_shares(storage_index):
384             bucketreaders[shnum] = BucketReader(self, filename,
385                                                 storage_index, shnum)
386         self.add_latency("get", time.time() - start)
387         return bucketreaders
388
389     def get_leases(self, storage_index):
390         """Provide an iterator that yields all of the leases attached to this
391         bucket. Each lease is returned as a LeaseInfo instance.
392
393         This method is not for client use.
394         """
395
396         # since all shares get the same lease data, we just grab the leases
397         # from the first share
398         try:
399             shnum, filename = self._get_bucket_shares(storage_index).next()
400             sf = ShareFile(filename)
401             return sf.get_leases()
402         except StopIteration:
403             return iter([])
404
405     def remote_slot_testv_and_readv_and_writev(self, storage_index,
406                                                secrets,
407                                                test_and_write_vectors,
408                                                read_vector):
409         start = time.time()
410         self.count("writev")
411         si_s = si_b2a(storage_index)
412         log.msg("storage: slot_writev %s" % si_s)
413         si_dir = storage_index_to_dir(storage_index)
414         (write_enabler, renew_secret, cancel_secret) = secrets
415         # shares exist if there is a file for them
416         bucketdir = os.path.join(self.sharedir, si_dir)
417         shares = {}
418         if os.path.isdir(bucketdir):
419             for sharenum_s in os.listdir(bucketdir):
420                 try:
421                     sharenum = int(sharenum_s)
422                 except ValueError:
423                     continue
424                 filename = os.path.join(bucketdir, sharenum_s)
425                 msf = MutableShareFile(filename, self)
426                 msf.check_write_enabler(write_enabler, si_s)
427                 shares[sharenum] = msf
428         # write_enabler is good for all existing shares.
429
430         # Now evaluate test vectors.
431         testv_is_good = True
432         for sharenum in test_and_write_vectors:
433             (testv, datav, new_length) = test_and_write_vectors[sharenum]
434             if sharenum in shares:
435                 if not shares[sharenum].check_testv(testv):
436                     self.log("testv failed: [%d]: %r" % (sharenum, testv))
437                     testv_is_good = False
438                     break
439             else:
440                 # compare the vectors against an empty share, in which all
441                 # reads return empty strings.
442                 if not EmptyShare().check_testv(testv):
443                     self.log("testv failed (empty): [%d] %r" % (sharenum,
444                                                                 testv))
445                     testv_is_good = False
446                     break
447
448         # now gather the read vectors, before we do any writes
449         read_data = {}
450         for sharenum, share in shares.items():
451             read_data[sharenum] = share.readv(read_vector)
452
453         ownerid = 1 # TODO
454         expire_time = time.time() + 31*24*60*60   # one month
455         lease_info = LeaseInfo(ownerid,
456                                renew_secret, cancel_secret,
457                                expire_time, self.my_nodeid)
458
459         if testv_is_good:
460             # now apply the write vectors
461             for sharenum in test_and_write_vectors:
462                 (testv, datav, new_length) = test_and_write_vectors[sharenum]
463                 if new_length == 0:
464                     if sharenum in shares:
465                         shares[sharenum].unlink()
466                 else:
467                     if sharenum not in shares:
468                         # allocate a new share
469                         allocated_size = 2000 # arbitrary, really
470                         share = self._allocate_slot_share(bucketdir, secrets,
471                                                           sharenum,
472                                                           allocated_size,
473                                                           owner_num=0)
474                         shares[sharenum] = share
475                     shares[sharenum].writev(datav, new_length)
476                     # and update the lease
477                     shares[sharenum].add_or_renew_lease(lease_info)
478
479             if new_length == 0:
480                 # delete empty bucket directories
481                 if not os.listdir(bucketdir):
482                     os.rmdir(bucketdir)
483
484
485         # all done
486         self.add_latency("writev", time.time() - start)
487         return (testv_is_good, read_data)
488
489     def _allocate_slot_share(self, bucketdir, secrets, sharenum,
490                              allocated_size, owner_num=0):
491         (write_enabler, renew_secret, cancel_secret) = secrets
492         my_nodeid = self.my_nodeid
493         fileutil.make_dirs(bucketdir)
494         filename = os.path.join(bucketdir, "%d" % sharenum)
495         share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
496                                          self)
497         return share
498
499     def remote_slot_readv(self, storage_index, shares, readv):
500         start = time.time()
501         self.count("readv")
502         si_s = si_b2a(storage_index)
503         lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
504                      facility="tahoe.storage", level=log.OPERATIONAL)
505         si_dir = storage_index_to_dir(storage_index)
506         # shares exist if there is a file for them
507         bucketdir = os.path.join(self.sharedir, si_dir)
508         if not os.path.isdir(bucketdir):
509             self.add_latency("readv", time.time() - start)
510             return {}
511         datavs = {}
512         for sharenum_s in os.listdir(bucketdir):
513             try:
514                 sharenum = int(sharenum_s)
515             except ValueError:
516                 continue
517             if sharenum in shares or not shares:
518                 filename = os.path.join(bucketdir, sharenum_s)
519                 msf = MutableShareFile(filename, self)
520                 datavs[sharenum] = msf.readv(readv)
521         log.msg("returning shares %s" % (datavs.keys(),),
522                 facility="tahoe.storage", level=log.NOISY, parent=lp)
523         self.add_latency("readv", time.time() - start)
524         return datavs
525
526     def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
527                                     reason):
528         fileutil.make_dirs(self.corruption_advisory_dir)
529         now = time_format.iso_utc(sep="T")
530         si_s = si_b2a(storage_index)
531         # windows can't handle colons in the filename
532         fn = os.path.join(self.corruption_advisory_dir,
533                           "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
534         f = open(fn, "w")
535         f.write("report: Share Corruption\n")
536         f.write("type: %s\n" % share_type)
537         f.write("storage_index: %s\n" % si_s)
538         f.write("share_number: %d\n" % shnum)
539         f.write("\n")
540         f.write(reason)
541         f.write("\n")
542         f.close()
543         log.msg(format=("client claims corruption in (%(share_type)s) " +
544                         "%(si)s-%(shnum)d: %(reason)s"),
545                 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
546                 level=log.SCARY, umid="SGx2fA")
547         return None