]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/server.py
storage: more paranoid handling of bounds and palimpsests in mutable share files
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / server.py
1 import os, re, weakref, struct, time
2
3 from foolscap.api import Referenceable
4 from twisted.application import service
5
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
10
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15      create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18 from allmydata.storage.expirer import LeaseCheckingCrawler
19
20 # storage/
21 # storage/shares/incoming
22 #   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23 #   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24 # storage/shares/$START/$STORAGEINDEX
25 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
26
27 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
28 # base-32 chars).
29
30 # $SHARENUM matches this regex:
31 NUM_RE=re.compile("^[0-9]+$")
32
33
34
35 class StorageServer(service.MultiService, Referenceable):
36     implements(RIStorageServer, IStatsProducer)
37     name = 'storage'
38     LeaseCheckerClass = LeaseCheckingCrawler
39
40     def __init__(self, storedir, nodeid, reserved_space=0,
41                  discard_storage=False, readonly_storage=False,
42                  stats_provider=None,
43                  expiration_enabled=False,
44                  expiration_mode="age",
45                  expiration_override_lease_duration=None,
46                  expiration_cutoff_date=None,
47                  expiration_sharetypes=("mutable", "immutable")):
48         service.MultiService.__init__(self)
49         assert isinstance(nodeid, str)
50         assert len(nodeid) == 20
51         self.my_nodeid = nodeid
52         self.storedir = storedir
53         sharedir = os.path.join(storedir, "shares")
54         fileutil.make_dirs(sharedir)
55         self.sharedir = sharedir
56         # we don't actually create the corruption-advisory dir until necessary
57         self.corruption_advisory_dir = os.path.join(storedir,
58                                                     "corruption-advisories")
59         self.reserved_space = int(reserved_space)
60         self.no_storage = discard_storage
61         self.readonly_storage = readonly_storage
62         self.stats_provider = stats_provider
63         if self.stats_provider:
64             self.stats_provider.register_producer(self)
65         self.incomingdir = os.path.join(sharedir, 'incoming')
66         self._clean_incomplete()
67         fileutil.make_dirs(self.incomingdir)
68         self._active_writers = weakref.WeakKeyDictionary()
69         log.msg("StorageServer created", facility="tahoe.storage")
70
71         if reserved_space:
72             if self.get_available_space() is None:
73                 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
74                         umin="0wZ27w", level=log.UNUSUAL)
75
76         self.latencies = {"allocate": [], # immutable
77                           "write": [],
78                           "close": [],
79                           "read": [],
80                           "get": [],
81                           "writev": [], # mutable
82                           "readv": [],
83                           "add-lease": [], # both
84                           "renew": [],
85                           "cancel": [],
86                           }
87         self.add_bucket_counter()
88
89         statefile = os.path.join(self.storedir, "lease_checker.state")
90         historyfile = os.path.join(self.storedir, "lease_checker.history")
91         klass = self.LeaseCheckerClass
92         self.lease_checker = klass(self, statefile, historyfile,
93                                    expiration_enabled, expiration_mode,
94                                    expiration_override_lease_duration,
95                                    expiration_cutoff_date,
96                                    expiration_sharetypes)
97         self.lease_checker.setServiceParent(self)
98
99     def __repr__(self):
100         return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
101
102     def add_bucket_counter(self):
103         statefile = os.path.join(self.storedir, "bucket_counter.state")
104         self.bucket_counter = BucketCountingCrawler(self, statefile)
105         self.bucket_counter.setServiceParent(self)
106
107     def count(self, name, delta=1):
108         if self.stats_provider:
109             self.stats_provider.count("storage_server." + name, delta)
110
111     def add_latency(self, category, latency):
112         a = self.latencies[category]
113         a.append(latency)
114         if len(a) > 1000:
115             self.latencies[category] = a[-1000:]
116
117     def get_latencies(self):
118         """Return a dict, indexed by category, that contains a dict of
119         latency numbers for each category. If there are sufficient samples
120         for unambiguous interpretation, each dict will contain the
121         following keys: mean, 01_0_percentile, 10_0_percentile,
122         50_0_percentile (median), 90_0_percentile, 95_0_percentile,
123         99_0_percentile, 99_9_percentile.  If there are insufficient
124         samples for a given percentile to be interpreted unambiguously
125         that percentile will be reported as None. If no samples have been
126         collected for the given category, then that category name will
127         not be present in the return value. """
128         # note that Amazon's Dynamo paper says they use 99.9% percentile.
129         output = {}
130         for category in self.latencies:
131             if not self.latencies[category]:
132                 continue
133             stats = {}
134             samples = self.latencies[category][:]
135             count = len(samples)
136             stats["samplesize"] = count
137             samples.sort()
138             if count > 1:
139                 stats["mean"] = sum(samples) / count
140             else:
141                 stats["mean"] = None
142
143             orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
144                              (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
145                              (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
146                              (0.999, "99_9_percentile", 1000)]
147
148             for percentile, percentilestring, minnumtoobserve in orderstatlist:
149                 if count >= minnumtoobserve:
150                     stats[percentilestring] = samples[int(percentile*count)]
151                 else:
152                     stats[percentilestring] = None
153
154             output[category] = stats
155         return output
156
157     def log(self, *args, **kwargs):
158         if "facility" not in kwargs:
159             kwargs["facility"] = "tahoe.storage"
160         return log.msg(*args, **kwargs)
161
162     def _clean_incomplete(self):
163         fileutil.rm_dir(self.incomingdir)
164
165     def get_stats(self):
166         # remember: RIStatsProvider requires that our return dict
167         # contains numeric values.
168         stats = { 'storage_server.allocated': self.allocated_size(), }
169         stats['storage_server.reserved_space'] = self.reserved_space
170         for category,ld in self.get_latencies().items():
171             for name,v in ld.items():
172                 stats['storage_server.latencies.%s.%s' % (category, name)] = v
173
174         try:
175             disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
176             writeable = disk['avail'] > 0
177
178             # spacetime predictors should use disk_avail / (d(disk_used)/dt)
179             stats['storage_server.disk_total'] = disk['total']
180             stats['storage_server.disk_used'] = disk['used']
181             stats['storage_server.disk_free_for_root'] = disk['free_for_root']
182             stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
183             stats['storage_server.disk_avail'] = disk['avail']
184         except AttributeError:
185             writeable = True
186         except EnvironmentError:
187             log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
188             writeable = False
189
190         if self.readonly_storage:
191             stats['storage_server.disk_avail'] = 0
192             writeable = False
193
194         stats['storage_server.accepting_immutable_shares'] = int(writeable)
195         s = self.bucket_counter.get_state()
196         bucket_count = s.get("last-complete-bucket-count")
197         if bucket_count:
198             stats['storage_server.total_bucket_count'] = bucket_count
199         return stats
200
201     def get_available_space(self):
202         """Returns available space for share storage in bytes, or None if no
203         API to get this information is available."""
204
205         if self.readonly_storage:
206             return 0
207         return fileutil.get_available_space(self.sharedir, self.reserved_space)
208
209     def allocated_size(self):
210         space = 0
211         for bw in self._active_writers:
212             space += bw.allocated_size()
213         return space
214
215     def remote_get_version(self):
216         remaining_space = self.get_available_space()
217         if remaining_space is None:
218             # We're on a platform that has no API to get disk stats.
219             remaining_space = 2**64
220
221         version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
222                     { "maximum-immutable-share-size": remaining_space,
223                       "tolerates-immutable-read-overrun": True,
224                       "delete-mutable-shares-with-zero-length-writev": True,
225                       "fills-holes-with-zero-bytes": True,
226                       "prevents-read-past-end-of-share-data": True,
227                       },
228                     "application-version": str(allmydata.__full_version__),
229                     }
230         return version
231
232     def remote_allocate_buckets(self, storage_index,
233                                 renew_secret, cancel_secret,
234                                 sharenums, allocated_size,
235                                 canary, owner_num=0):
236         # owner_num is not for clients to set, but rather it should be
237         # curried into the PersonalStorageServer instance that is dedicated
238         # to a particular owner.
239         start = time.time()
240         self.count("allocate")
241         alreadygot = set()
242         bucketwriters = {} # k: shnum, v: BucketWriter
243         si_dir = storage_index_to_dir(storage_index)
244         si_s = si_b2a(storage_index)
245
246         log.msg("storage: allocate_buckets %s" % si_s)
247
248         # in this implementation, the lease information (including secrets)
249         # goes into the share files themselves. It could also be put into a
250         # separate database. Note that the lease should not be added until
251         # the BucketWriter has been closed.
252         expire_time = time.time() + 31*24*60*60
253         lease_info = LeaseInfo(owner_num,
254                                renew_secret, cancel_secret,
255                                expire_time, self.my_nodeid)
256
257         max_space_per_bucket = allocated_size
258
259         remaining_space = self.get_available_space()
260         limited = remaining_space is not None
261         if limited:
262             # this is a bit conservative, since some of this allocated_size()
263             # has already been written to disk, where it will show up in
264             # get_available_space.
265             remaining_space -= self.allocated_size()
266         # self.readonly_storage causes remaining_space <= 0
267
268         # fill alreadygot with all shares that we have, not just the ones
269         # they asked about: this will save them a lot of work. Add or update
270         # leases for all of them: if they want us to hold shares for this
271         # file, they'll want us to hold leases for this file.
272         for (shnum, fn) in self._get_bucket_shares(storage_index):
273             alreadygot.add(shnum)
274             sf = ShareFile(fn)
275             sf.add_or_renew_lease(lease_info)
276
277         for shnum in sharenums:
278             incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
279             finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
280             if os.path.exists(finalhome):
281                 # great! we already have it. easy.
282                 pass
283             elif os.path.exists(incominghome):
284                 # Note that we don't create BucketWriters for shnums that
285                 # have a partial share (in incoming/), so if a second upload
286                 # occurs while the first is still in progress, the second
287                 # uploader will use different storage servers.
288                 pass
289             elif (not limited) or (remaining_space >= max_space_per_bucket):
290                 # ok! we need to create the new share file.
291                 bw = BucketWriter(self, incominghome, finalhome,
292                                   max_space_per_bucket, lease_info, canary)
293                 if self.no_storage:
294                     bw.throw_out_all_data = True
295                 bucketwriters[shnum] = bw
296                 self._active_writers[bw] = 1
297                 if limited:
298                     remaining_space -= max_space_per_bucket
299             else:
300                 # bummer! not enough space to accept this bucket
301                 pass
302
303         if bucketwriters:
304             fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
305
306         self.add_latency("allocate", time.time() - start)
307         return alreadygot, bucketwriters
308
309     def _iter_share_files(self, storage_index):
310         for shnum, filename in self._get_bucket_shares(storage_index):
311             f = open(filename, 'rb')
312             header = f.read(32)
313             f.close()
314             if header[:32] == MutableShareFile.MAGIC:
315                 sf = MutableShareFile(filename, self)
316                 # note: if the share has been migrated, the renew_lease()
317                 # call will throw an exception, with information to help the
318                 # client update the lease.
319             elif header[:4] == struct.pack(">L", 1):
320                 sf = ShareFile(filename)
321             else:
322                 continue # non-sharefile
323             yield sf
324
325     def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
326                          owner_num=1):
327         start = time.time()
328         self.count("add-lease")
329         new_expire_time = time.time() + 31*24*60*60
330         lease_info = LeaseInfo(owner_num,
331                                renew_secret, cancel_secret,
332                                new_expire_time, self.my_nodeid)
333         for sf in self._iter_share_files(storage_index):
334             sf.add_or_renew_lease(lease_info)
335         self.add_latency("add-lease", time.time() - start)
336         return None
337
338     def remote_renew_lease(self, storage_index, renew_secret):
339         start = time.time()
340         self.count("renew")
341         new_expire_time = time.time() + 31*24*60*60
342         found_buckets = False
343         for sf in self._iter_share_files(storage_index):
344             found_buckets = True
345             sf.renew_lease(renew_secret, new_expire_time)
346         self.add_latency("renew", time.time() - start)
347         if not found_buckets:
348             raise IndexError("no such lease to renew")
349
350     def bucket_writer_closed(self, bw, consumed_size):
351         if self.stats_provider:
352             self.stats_provider.count('storage_server.bytes_added', consumed_size)
353         del self._active_writers[bw]
354
355     def _get_bucket_shares(self, storage_index):
356         """Return a list of (shnum, pathname) tuples for files that hold
357         shares for this storage_index. In each tuple, 'shnum' will always be
358         the integer form of the last component of 'pathname'."""
359         storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
360         try:
361             for f in os.listdir(storagedir):
362                 if NUM_RE.match(f):
363                     filename = os.path.join(storagedir, f)
364                     yield (int(f), filename)
365         except OSError:
366             # Commonly caused by there being no buckets at all.
367             pass
368
369     def remote_get_buckets(self, storage_index):
370         start = time.time()
371         self.count("get")
372         si_s = si_b2a(storage_index)
373         log.msg("storage: get_buckets %s" % si_s)
374         bucketreaders = {} # k: sharenum, v: BucketReader
375         for shnum, filename in self._get_bucket_shares(storage_index):
376             bucketreaders[shnum] = BucketReader(self, filename,
377                                                 storage_index, shnum)
378         self.add_latency("get", time.time() - start)
379         return bucketreaders
380
381     def get_leases(self, storage_index):
382         """Provide an iterator that yields all of the leases attached to this
383         bucket. Each lease is returned as a LeaseInfo instance.
384
385         This method is not for client use.
386         """
387
388         # since all shares get the same lease data, we just grab the leases
389         # from the first share
390         try:
391             shnum, filename = self._get_bucket_shares(storage_index).next()
392             sf = ShareFile(filename)
393             return sf.get_leases()
394         except StopIteration:
395             return iter([])
396
397     def remote_slot_testv_and_readv_and_writev(self, storage_index,
398                                                secrets,
399                                                test_and_write_vectors,
400                                                read_vector):
401         start = time.time()
402         self.count("writev")
403         si_s = si_b2a(storage_index)
404         log.msg("storage: slot_writev %s" % si_s)
405         si_dir = storage_index_to_dir(storage_index)
406         (write_enabler, renew_secret, cancel_secret) = secrets
407         # shares exist if there is a file for them
408         bucketdir = os.path.join(self.sharedir, si_dir)
409         shares = {}
410         if os.path.isdir(bucketdir):
411             for sharenum_s in os.listdir(bucketdir):
412                 try:
413                     sharenum = int(sharenum_s)
414                 except ValueError:
415                     continue
416                 filename = os.path.join(bucketdir, sharenum_s)
417                 msf = MutableShareFile(filename, self)
418                 msf.check_write_enabler(write_enabler, si_s)
419                 shares[sharenum] = msf
420         # write_enabler is good for all existing shares.
421
422         # Now evaluate test vectors.
423         testv_is_good = True
424         for sharenum in test_and_write_vectors:
425             (testv, datav, new_length) = test_and_write_vectors[sharenum]
426             if sharenum in shares:
427                 if not shares[sharenum].check_testv(testv):
428                     self.log("testv failed: [%d]: %r" % (sharenum, testv))
429                     testv_is_good = False
430                     break
431             else:
432                 # compare the vectors against an empty share, in which all
433                 # reads return empty strings.
434                 if not EmptyShare().check_testv(testv):
435                     self.log("testv failed (empty): [%d] %r" % (sharenum,
436                                                                 testv))
437                     testv_is_good = False
438                     break
439
440         # now gather the read vectors, before we do any writes
441         read_data = {}
442         for sharenum, share in shares.items():
443             read_data[sharenum] = share.readv(read_vector)
444
445         ownerid = 1 # TODO
446         expire_time = time.time() + 31*24*60*60   # one month
447         lease_info = LeaseInfo(ownerid,
448                                renew_secret, cancel_secret,
449                                expire_time, self.my_nodeid)
450
451         if testv_is_good:
452             # now apply the write vectors
453             for sharenum in test_and_write_vectors:
454                 (testv, datav, new_length) = test_and_write_vectors[sharenum]
455                 if new_length == 0:
456                     if sharenum in shares:
457                         shares[sharenum].unlink()
458                 else:
459                     if sharenum not in shares:
460                         # allocate a new share
461                         allocated_size = 2000 # arbitrary, really
462                         share = self._allocate_slot_share(bucketdir, secrets,
463                                                           sharenum,
464                                                           allocated_size,
465                                                           owner_num=0)
466                         shares[sharenum] = share
467                     shares[sharenum].writev(datav, new_length)
468                     # and update the lease
469                     shares[sharenum].add_or_renew_lease(lease_info)
470
471             if new_length == 0:
472                 # delete empty bucket directories
473                 if not os.listdir(bucketdir):
474                     os.rmdir(bucketdir)
475
476
477         # all done
478         self.add_latency("writev", time.time() - start)
479         return (testv_is_good, read_data)
480
481     def _allocate_slot_share(self, bucketdir, secrets, sharenum,
482                              allocated_size, owner_num=0):
483         (write_enabler, renew_secret, cancel_secret) = secrets
484         my_nodeid = self.my_nodeid
485         fileutil.make_dirs(bucketdir)
486         filename = os.path.join(bucketdir, "%d" % sharenum)
487         share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
488                                          self)
489         return share
490
491     def remote_slot_readv(self, storage_index, shares, readv):
492         start = time.time()
493         self.count("readv")
494         si_s = si_b2a(storage_index)
495         lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
496                      facility="tahoe.storage", level=log.OPERATIONAL)
497         si_dir = storage_index_to_dir(storage_index)
498         # shares exist if there is a file for them
499         bucketdir = os.path.join(self.sharedir, si_dir)
500         if not os.path.isdir(bucketdir):
501             self.add_latency("readv", time.time() - start)
502             return {}
503         datavs = {}
504         for sharenum_s in os.listdir(bucketdir):
505             try:
506                 sharenum = int(sharenum_s)
507             except ValueError:
508                 continue
509             if sharenum in shares or not shares:
510                 filename = os.path.join(bucketdir, sharenum_s)
511                 msf = MutableShareFile(filename, self)
512                 datavs[sharenum] = msf.readv(readv)
513         log.msg("returning shares %s" % (datavs.keys(),),
514                 facility="tahoe.storage", level=log.NOISY, parent=lp)
515         self.add_latency("readv", time.time() - start)
516         return datavs
517
518     def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
519                                     reason):
520         fileutil.make_dirs(self.corruption_advisory_dir)
521         now = time_format.iso_utc(sep="T")
522         si_s = si_b2a(storage_index)
523         # windows can't handle colons in the filename
524         fn = os.path.join(self.corruption_advisory_dir,
525                           "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
526         f = open(fn, "w")
527         f.write("report: Share Corruption\n")
528         f.write("type: %s\n" % share_type)
529         f.write("storage_index: %s\n" % si_s)
530         f.write("share_number: %d\n" % shnum)
531         f.write("\n")
532         f.write(reason)
533         f.write("\n")
534         f.close()
535         log.msg(format=("client claims corruption in (%(share_type)s) " +
536                         "%(si)s-%(shnum)d: %(reason)s"),
537                 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
538                 level=log.SCARY, umid="SGx2fA")
539         return None