]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/server.py
GC: add date-cutoff -based expiration, add proposed docs
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / server.py
1 import os, re, weakref, struct, time
2
3 from foolscap import Referenceable
4 from twisted.application import service
5
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, log, time_format
9 import allmydata # for __full_version__
10
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15      create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18 from allmydata.storage.expirer import LeaseCheckingCrawler
19
20 # storage/
21 # storage/shares/incoming
22 #   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23 #   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24 # storage/shares/$START/$STORAGEINDEX
25 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
26
27 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
28 # base-32 chars).
29
30 # $SHARENUM matches this regex:
31 NUM_RE=re.compile("^[0-9]+$")
32
33
34
35 class StorageServer(service.MultiService, Referenceable):
36     implements(RIStorageServer, IStatsProducer)
37     name = 'storage'
38     LeaseCheckerClass = LeaseCheckingCrawler
39
40     def __init__(self, storedir, nodeid, reserved_space=0,
41                  discard_storage=False, readonly_storage=False,
42                  stats_provider=None,
43                  expiration_enabled=False,
44                  expiration_mode=("age", 31*24*60*60)):
45         service.MultiService.__init__(self)
46         assert isinstance(nodeid, str)
47         assert len(nodeid) == 20
48         self.my_nodeid = nodeid
49         self.storedir = storedir
50         sharedir = os.path.join(storedir, "shares")
51         fileutil.make_dirs(sharedir)
52         self.sharedir = sharedir
53         # we don't actually create the corruption-advisory dir until necessary
54         self.corruption_advisory_dir = os.path.join(storedir,
55                                                     "corruption-advisories")
56         self.reserved_space = int(reserved_space)
57         self.no_storage = discard_storage
58         self.readonly_storage = readonly_storage
59         self.stats_provider = stats_provider
60         if self.stats_provider:
61             self.stats_provider.register_producer(self)
62         self.incomingdir = os.path.join(sharedir, 'incoming')
63         self._clean_incomplete()
64         fileutil.make_dirs(self.incomingdir)
65         self._active_writers = weakref.WeakKeyDictionary()
66         lp = log.msg("StorageServer created", facility="tahoe.storage")
67
68         if reserved_space:
69             if self.get_available_space() is None:
70                 log.msg("warning: [storage]reserved_space= is set, but this platform does not support statvfs(2), so this reservation cannot be honored",
71                         umin="0wZ27w", level=log.UNUSUAL)
72
73         self.latencies = {"allocate": [], # immutable
74                           "write": [],
75                           "close": [],
76                           "read": [],
77                           "get": [],
78                           "writev": [], # mutable
79                           "readv": [],
80                           "add-lease": [], # both
81                           "renew": [],
82                           "cancel": [],
83                           }
84         self.add_bucket_counter()
85         self.add_lease_checker(expiration_enabled, expiration_mode)
86
87     def add_bucket_counter(self):
88         statefile = os.path.join(self.storedir, "bucket_counter.state")
89         self.bucket_counter = BucketCountingCrawler(self, statefile)
90         self.bucket_counter.setServiceParent(self)
91
92     def add_lease_checker(self, expiration_enabled, expiration_mode):
93         statefile = os.path.join(self.storedir, "lease_checker.state")
94         historyfile = os.path.join(self.storedir, "lease_checker.history")
95         klass = self.LeaseCheckerClass
96         self.lease_checker = klass(self, statefile, historyfile,
97                                    expiration_enabled=expiration_enabled,
98                                    expiration_mode=expiration_mode)
99         self.lease_checker.setServiceParent(self)
100
101     def count(self, name, delta=1):
102         if self.stats_provider:
103             self.stats_provider.count("storage_server." + name, delta)
104
105     def add_latency(self, category, latency):
106         a = self.latencies[category]
107         a.append(latency)
108         if len(a) > 1000:
109             self.latencies[category] = a[-1000:]
110
111     def get_latencies(self):
112         """Return a dict, indexed by category, that contains a dict of
113         latency numbers for each category. Each dict will contain the
114         following keys: mean, 01_0_percentile, 10_0_percentile,
115         50_0_percentile (median), 90_0_percentile, 95_0_percentile,
116         99_0_percentile, 99_9_percentile. If no samples have been collected
117         for the given category, then that category name will not be present
118         in the return value."""
119         # note that Amazon's Dynamo paper says they use 99.9% percentile.
120         output = {}
121         for category in self.latencies:
122             if not self.latencies[category]:
123                 continue
124             stats = {}
125             samples = self.latencies[category][:]
126             samples.sort()
127             count = len(samples)
128             stats["mean"] = sum(samples) / count
129             stats["01_0_percentile"] = samples[int(0.01 * count)]
130             stats["10_0_percentile"] = samples[int(0.1 * count)]
131             stats["50_0_percentile"] = samples[int(0.5 * count)]
132             stats["90_0_percentile"] = samples[int(0.9 * count)]
133             stats["95_0_percentile"] = samples[int(0.95 * count)]
134             stats["99_0_percentile"] = samples[int(0.99 * count)]
135             stats["99_9_percentile"] = samples[int(0.999 * count)]
136             output[category] = stats
137         return output
138
139     def log(self, *args, **kwargs):
140         if "facility" not in kwargs:
141             kwargs["facility"] = "tahoe.storage"
142         return log.msg(*args, **kwargs)
143
144     def _clean_incomplete(self):
145         fileutil.rm_dir(self.incomingdir)
146
147     def do_statvfs(self):
148         return os.statvfs(self.storedir)
149
150     def get_stats(self):
151         # remember: RIStatsProvider requires that our return dict
152         # contains numeric values.
153         stats = { 'storage_server.allocated': self.allocated_size(), }
154         stats["storage_server.reserved_space"] = self.reserved_space
155         for category,ld in self.get_latencies().items():
156             for name,v in ld.items():
157                 stats['storage_server.latencies.%s.%s' % (category, name)] = v
158         writeable = True
159         if self.readonly_storage:
160             writeable = False
161         try:
162             s = self.do_statvfs()
163             # on my mac laptop:
164             #  statvfs(2) is a wrapper around statfs(2).
165             #    statvfs.f_frsize = statfs.f_bsize :
166             #     "minimum unit of allocation" (statvfs)
167             #     "fundamental file system block size" (statfs)
168             #    statvfs.f_bsize = statfs.f_iosize = stat.st_blocks : preferred IO size
169             # on an encrypted home directory ("FileVault"), it gets f_blocks
170             # wrong, and s.f_blocks*s.f_frsize is twice the size of my disk,
171             # but s.f_bavail*s.f_frsize is correct
172
173             disk_total = s.f_frsize * s.f_blocks
174             disk_used = s.f_frsize * (s.f_blocks - s.f_bfree)
175             # spacetime predictors should look at the slope of disk_used.
176             disk_free_for_root = s.f_frsize * s.f_bfree
177             disk_free_for_nonroot = s.f_frsize * s.f_bavail
178
179             # include our local policy here: if we stop accepting shares when
180             # the available space drops below 1GB, then include that fact in
181             # disk_avail.
182             disk_avail = disk_free_for_nonroot - self.reserved_space
183             disk_avail = max(disk_avail, 0)
184             if self.readonly_storage:
185                 disk_avail = 0
186             if disk_avail == 0:
187                 writeable = False
188
189             # spacetime predictors should use disk_avail / (d(disk_used)/dt)
190             stats["storage_server.disk_total"] = disk_total
191             stats["storage_server.disk_used"] = disk_used
192             stats["storage_server.disk_free_for_root"] = disk_free_for_root
193             stats["storage_server.disk_free_for_nonroot"] = disk_free_for_nonroot
194             stats["storage_server.disk_avail"] = disk_avail
195         except AttributeError:
196             # os.statvfs is available only on unix
197             pass
198         stats["storage_server.accepting_immutable_shares"] = int(writeable)
199         s = self.bucket_counter.get_state()
200         bucket_count = s.get("last-complete-bucket-count")
201         if bucket_count:
202             stats["storage_server.total_bucket_count"] = bucket_count
203         return stats
204
205
206     def stat_disk(self, d):
207         s = os.statvfs(d)
208         # s.f_bavail: available to non-root users
209         disk_avail = s.f_frsize * s.f_bavail
210         return disk_avail
211
212     def get_available_space(self):
213         # returns None if it cannot be measured (windows)
214         try:
215             disk_avail = self.stat_disk(self.storedir)
216             disk_avail -= self.reserved_space
217         except AttributeError:
218             disk_avail = None
219         if self.readonly_storage:
220             disk_avail = 0
221         return disk_avail
222
223     def allocated_size(self):
224         space = 0
225         for bw in self._active_writers:
226             space += bw.allocated_size()
227         return space
228
229     def remote_get_version(self):
230         remaining_space = self.get_available_space()
231         if remaining_space is None:
232             # we're on a platform that doesn't have 'df', so make a vague
233             # guess.
234             remaining_space = 2**64
235         version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
236                     { "maximum-immutable-share-size": remaining_space,
237                       "tolerates-immutable-read-overrun": True,
238                       "delete-mutable-shares-with-zero-length-writev": True,
239                       },
240                     "application-version": str(allmydata.__full_version__),
241                     }
242         return version
243
244     def remote_allocate_buckets(self, storage_index,
245                                 renew_secret, cancel_secret,
246                                 sharenums, allocated_size,
247                                 canary, owner_num=0):
248         # owner_num is not for clients to set, but rather it should be
249         # curried into the PersonalStorageServer instance that is dedicated
250         # to a particular owner.
251         start = time.time()
252         self.count("allocate")
253         alreadygot = set()
254         bucketwriters = {} # k: shnum, v: BucketWriter
255         si_dir = storage_index_to_dir(storage_index)
256         si_s = si_b2a(storage_index)
257
258         log.msg("storage: allocate_buckets %s" % si_s)
259
260         # in this implementation, the lease information (including secrets)
261         # goes into the share files themselves. It could also be put into a
262         # separate database. Note that the lease should not be added until
263         # the BucketWriter has been closed.
264         expire_time = time.time() + 31*24*60*60
265         lease_info = LeaseInfo(owner_num,
266                                renew_secret, cancel_secret,
267                                expire_time, self.my_nodeid)
268
269         max_space_per_bucket = allocated_size
270
271         remaining_space = self.get_available_space()
272         limited = remaining_space is not None
273         if limited:
274             # this is a bit conservative, since some of this allocated_size()
275             # has already been written to disk, where it will show up in
276             # get_available_space.
277             remaining_space -= self.allocated_size()
278
279         # fill alreadygot with all shares that we have, not just the ones
280         # they asked about: this will save them a lot of work. Add or update
281         # leases for all of them: if they want us to hold shares for this
282         # file, they'll want us to hold leases for this file.
283         for (shnum, fn) in self._get_bucket_shares(storage_index):
284             alreadygot.add(shnum)
285             sf = ShareFile(fn)
286             sf.add_or_renew_lease(lease_info)
287
288         # self.readonly_storage causes remaining_space=0
289
290         for shnum in sharenums:
291             incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
292             finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
293             if os.path.exists(finalhome):
294                 # great! we already have it. easy.
295                 pass
296             elif os.path.exists(incominghome):
297                 # Note that we don't create BucketWriters for shnums that
298                 # have a partial share (in incoming/), so if a second upload
299                 # occurs while the first is still in progress, the second
300                 # uploader will use different storage servers.
301                 pass
302             elif (not limited) or (remaining_space >= max_space_per_bucket):
303                 # ok! we need to create the new share file.
304                 bw = BucketWriter(self, incominghome, finalhome,
305                                   max_space_per_bucket, lease_info, canary)
306                 if self.no_storage:
307                     bw.throw_out_all_data = True
308                 bucketwriters[shnum] = bw
309                 self._active_writers[bw] = 1
310                 if limited:
311                     remaining_space -= max_space_per_bucket
312             else:
313                 # bummer! not enough space to accept this bucket
314                 pass
315
316         if bucketwriters:
317             fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
318
319         self.add_latency("allocate", time.time() - start)
320         return alreadygot, bucketwriters
321
322     def _iter_share_files(self, storage_index):
323         for shnum, filename in self._get_bucket_shares(storage_index):
324             f = open(filename, 'rb')
325             header = f.read(32)
326             f.close()
327             if header[:32] == MutableShareFile.MAGIC:
328                 sf = MutableShareFile(filename, self)
329                 # note: if the share has been migrated, the renew_lease()
330                 # call will throw an exception, with information to help the
331                 # client update the lease.
332             elif header[:4] == struct.pack(">L", 1):
333                 sf = ShareFile(filename)
334             else:
335                 continue # non-sharefile
336             yield sf
337
338     def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
339                          owner_num=1):
340         start = time.time()
341         self.count("add-lease")
342         new_expire_time = time.time() + 31*24*60*60
343         lease_info = LeaseInfo(owner_num,
344                                renew_secret, cancel_secret,
345                                new_expire_time, self.my_nodeid)
346         for sf in self._iter_share_files(storage_index):
347             sf.add_or_renew_lease(lease_info)
348         self.add_latency("add-lease", time.time() - start)
349         return None
350
351     def remote_renew_lease(self, storage_index, renew_secret):
352         start = time.time()
353         self.count("renew")
354         new_expire_time = time.time() + 31*24*60*60
355         found_buckets = False
356         for sf in self._iter_share_files(storage_index):
357             found_buckets = True
358             sf.renew_lease(renew_secret, new_expire_time)
359         self.add_latency("renew", time.time() - start)
360         if not found_buckets:
361             raise IndexError("no such lease to renew")
362
363     def remote_cancel_lease(self, storage_index, cancel_secret):
364         start = time.time()
365         self.count("cancel")
366
367         total_space_freed = 0
368         found_buckets = False
369         for sf in self._iter_share_files(storage_index):
370             # note: if we can't find a lease on one share, we won't bother
371             # looking in the others. Unless something broke internally
372             # (perhaps we ran out of disk space while adding a lease), the
373             # leases on all shares will be identical.
374             found_buckets = True
375             # this raises IndexError if the lease wasn't present XXXX
376             total_space_freed += sf.cancel_lease(cancel_secret)
377
378         if found_buckets:
379             storagedir = os.path.join(self.sharedir,
380                                       storage_index_to_dir(storage_index))
381             if not os.listdir(storagedir):
382                 os.rmdir(storagedir)
383
384         if self.stats_provider:
385             self.stats_provider.count('storage_server.bytes_freed',
386                                       total_space_freed)
387         self.add_latency("cancel", time.time() - start)
388         if not found_buckets:
389             raise IndexError("no such storage index")
390
391     def bucket_writer_closed(self, bw, consumed_size):
392         if self.stats_provider:
393             self.stats_provider.count('storage_server.bytes_added', consumed_size)
394         del self._active_writers[bw]
395
396     def _get_bucket_shares(self, storage_index):
397         """Return a list of (shnum, pathname) tuples for files that hold
398         shares for this storage_index. In each tuple, 'shnum' will always be
399         the integer form of the last component of 'pathname'."""
400         storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
401         try:
402             for f in os.listdir(storagedir):
403                 if NUM_RE.match(f):
404                     filename = os.path.join(storagedir, f)
405                     yield (int(f), filename)
406         except OSError:
407             # Commonly caused by there being no buckets at all.
408             pass
409
410     def remote_get_buckets(self, storage_index):
411         start = time.time()
412         self.count("get")
413         si_s = si_b2a(storage_index)
414         log.msg("storage: get_buckets %s" % si_s)
415         bucketreaders = {} # k: sharenum, v: BucketReader
416         for shnum, filename in self._get_bucket_shares(storage_index):
417             bucketreaders[shnum] = BucketReader(self, filename,
418                                                 storage_index, shnum)
419         self.add_latency("get", time.time() - start)
420         return bucketreaders
421
422     def get_leases(self, storage_index):
423         """Provide an iterator that yields all of the leases attached to this
424         bucket. Each lease is returned as a LeaseInfo instance.
425
426         This method is not for client use.
427         """
428
429         # since all shares get the same lease data, we just grab the leases
430         # from the first share
431         try:
432             shnum, filename = self._get_bucket_shares(storage_index).next()
433             sf = ShareFile(filename)
434             return sf.get_leases()
435         except StopIteration:
436             return iter([])
437
438     def remote_slot_testv_and_readv_and_writev(self, storage_index,
439                                                secrets,
440                                                test_and_write_vectors,
441                                                read_vector):
442         start = time.time()
443         self.count("writev")
444         si_s = si_b2a(storage_index)
445         lp = log.msg("storage: slot_writev %s" % si_s)
446         si_dir = storage_index_to_dir(storage_index)
447         (write_enabler, renew_secret, cancel_secret) = secrets
448         # shares exist if there is a file for them
449         bucketdir = os.path.join(self.sharedir, si_dir)
450         shares = {}
451         if os.path.isdir(bucketdir):
452             for sharenum_s in os.listdir(bucketdir):
453                 try:
454                     sharenum = int(sharenum_s)
455                 except ValueError:
456                     continue
457                 filename = os.path.join(bucketdir, sharenum_s)
458                 msf = MutableShareFile(filename, self)
459                 msf.check_write_enabler(write_enabler, si_s)
460                 shares[sharenum] = msf
461         # write_enabler is good for all existing shares.
462
463         # Now evaluate test vectors.
464         testv_is_good = True
465         for sharenum in test_and_write_vectors:
466             (testv, datav, new_length) = test_and_write_vectors[sharenum]
467             if sharenum in shares:
468                 if not shares[sharenum].check_testv(testv):
469                     self.log("testv failed: [%d]: %r" % (sharenum, testv))
470                     testv_is_good = False
471                     break
472             else:
473                 # compare the vectors against an empty share, in which all
474                 # reads return empty strings.
475                 if not EmptyShare().check_testv(testv):
476                     self.log("testv failed (empty): [%d] %r" % (sharenum,
477                                                                 testv))
478                     testv_is_good = False
479                     break
480
481         # now gather the read vectors, before we do any writes
482         read_data = {}
483         for sharenum, share in shares.items():
484             read_data[sharenum] = share.readv(read_vector)
485
486         ownerid = 1 # TODO
487         expire_time = time.time() + 31*24*60*60   # one month
488         lease_info = LeaseInfo(ownerid,
489                                renew_secret, cancel_secret,
490                                expire_time, self.my_nodeid)
491
492         if testv_is_good:
493             # now apply the write vectors
494             for sharenum in test_and_write_vectors:
495                 (testv, datav, new_length) = test_and_write_vectors[sharenum]
496                 if new_length == 0:
497                     if sharenum in shares:
498                         shares[sharenum].unlink()
499                 else:
500                     if sharenum not in shares:
501                         # allocate a new share
502                         allocated_size = 2000 # arbitrary, really
503                         share = self._allocate_slot_share(bucketdir, secrets,
504                                                           sharenum,
505                                                           allocated_size,
506                                                           owner_num=0)
507                         shares[sharenum] = share
508                     shares[sharenum].writev(datav, new_length)
509                     # and update the lease
510                     shares[sharenum].add_or_renew_lease(lease_info)
511
512             if new_length == 0:
513                 # delete empty bucket directories
514                 if not os.listdir(bucketdir):
515                     os.rmdir(bucketdir)
516
517
518         # all done
519         self.add_latency("writev", time.time() - start)
520         return (testv_is_good, read_data)
521
522     def _allocate_slot_share(self, bucketdir, secrets, sharenum,
523                              allocated_size, owner_num=0):
524         (write_enabler, renew_secret, cancel_secret) = secrets
525         my_nodeid = self.my_nodeid
526         fileutil.make_dirs(bucketdir)
527         filename = os.path.join(bucketdir, "%d" % sharenum)
528         share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
529                                          self)
530         return share
531
532     def remote_slot_readv(self, storage_index, shares, readv):
533         start = time.time()
534         self.count("readv")
535         si_s = si_b2a(storage_index)
536         lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
537                      facility="tahoe.storage", level=log.OPERATIONAL)
538         si_dir = storage_index_to_dir(storage_index)
539         # shares exist if there is a file for them
540         bucketdir = os.path.join(self.sharedir, si_dir)
541         if not os.path.isdir(bucketdir):
542             self.add_latency("readv", time.time() - start)
543             return {}
544         datavs = {}
545         for sharenum_s in os.listdir(bucketdir):
546             try:
547                 sharenum = int(sharenum_s)
548             except ValueError:
549                 continue
550             if sharenum in shares or not shares:
551                 filename = os.path.join(bucketdir, sharenum_s)
552                 msf = MutableShareFile(filename, self)
553                 datavs[sharenum] = msf.readv(readv)
554         log.msg("returning shares %s" % (datavs.keys(),),
555                 facility="tahoe.storage", level=log.NOISY, parent=lp)
556         self.add_latency("readv", time.time() - start)
557         return datavs
558
559     def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
560                                     reason):
561         fileutil.make_dirs(self.corruption_advisory_dir)
562         now = time_format.iso_utc(sep="T")
563         si_s = si_b2a(storage_index)
564         # windows can't handle colons in the filename
565         fn = os.path.join(self.corruption_advisory_dir,
566                           "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
567         f = open(fn, "w")
568         f.write("report: Share Corruption\n")
569         f.write("type: %s\n" % share_type)
570         f.write("storage_index: %s\n" % si_s)
571         f.write("share_number: %d\n" % shnum)
572         f.write("\n")
573         f.write(reason)
574         f.write("\n")
575         f.close()
576         log.msg(format=("client claims corruption in (%(share_type)s) " +
577                         "%(si)s-%(shnum)d: %(reason)s"),
578                 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
579                 level=log.SCARY, umid="SGx2fA")
580         return None
581