]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/server.py
Change the maximum mutable share size to 69105 TB, and add a maximum-mutable-share...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / server.py
1 import os, re, weakref, struct, time
2
3 from foolscap.api import Referenceable
4 from twisted.application import service
5
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
10
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15      create_mutable_sharefile
16 from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
17 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20
21 # storage/
22 # storage/shares/incoming
23 #   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
24 #   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
25 # storage/shares/$START/$STORAGEINDEX
26 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
27
28 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
29 # base-32 chars).
30
31 # $SHARENUM matches this regex:
32 NUM_RE=re.compile("^[0-9]+$")
33
34
35
36 class StorageServer(service.MultiService, Referenceable):
37     implements(RIStorageServer, IStatsProducer)
38     name = 'storage'
39     LeaseCheckerClass = LeaseCheckingCrawler
40
41     def __init__(self, storedir, nodeid, reserved_space=0,
42                  discard_storage=False, readonly_storage=False,
43                  stats_provider=None,
44                  expiration_enabled=False,
45                  expiration_mode="age",
46                  expiration_override_lease_duration=None,
47                  expiration_cutoff_date=None,
48                  expiration_sharetypes=("mutable", "immutable")):
49         service.MultiService.__init__(self)
50         assert isinstance(nodeid, str)
51         assert len(nodeid) == 20
52         self.my_nodeid = nodeid
53         self.storedir = storedir
54         sharedir = os.path.join(storedir, "shares")
55         fileutil.make_dirs(sharedir)
56         self.sharedir = sharedir
57         # we don't actually create the corruption-advisory dir until necessary
58         self.corruption_advisory_dir = os.path.join(storedir,
59                                                     "corruption-advisories")
60         self.reserved_space = int(reserved_space)
61         self.no_storage = discard_storage
62         self.readonly_storage = readonly_storage
63         self.stats_provider = stats_provider
64         if self.stats_provider:
65             self.stats_provider.register_producer(self)
66         self.incomingdir = os.path.join(sharedir, 'incoming')
67         self._clean_incomplete()
68         fileutil.make_dirs(self.incomingdir)
69         self._active_writers = weakref.WeakKeyDictionary()
70         log.msg("StorageServer created", facility="tahoe.storage")
71
72         if reserved_space:
73             if self.get_available_space() is None:
74                 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
75                         umin="0wZ27w", level=log.UNUSUAL)
76
77         self.latencies = {"allocate": [], # immutable
78                           "write": [],
79                           "close": [],
80                           "read": [],
81                           "get": [],
82                           "writev": [], # mutable
83                           "readv": [],
84                           "add-lease": [], # both
85                           "renew": [],
86                           "cancel": [],
87                           }
88         self.add_bucket_counter()
89
90         statefile = os.path.join(self.storedir, "lease_checker.state")
91         historyfile = os.path.join(self.storedir, "lease_checker.history")
92         klass = self.LeaseCheckerClass
93         self.lease_checker = klass(self, statefile, historyfile,
94                                    expiration_enabled, expiration_mode,
95                                    expiration_override_lease_duration,
96                                    expiration_cutoff_date,
97                                    expiration_sharetypes)
98         self.lease_checker.setServiceParent(self)
99
100     def __repr__(self):
101         return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
102
103     def have_shares(self):
104         # quick test to decide if we need to commit to an implicit
105         # permutation-seed or if we should use a new one
106         return bool(set(os.listdir(self.sharedir)) - set(["incoming"]))
107
108     def add_bucket_counter(self):
109         statefile = os.path.join(self.storedir, "bucket_counter.state")
110         self.bucket_counter = BucketCountingCrawler(self, statefile)
111         self.bucket_counter.setServiceParent(self)
112
113     def count(self, name, delta=1):
114         if self.stats_provider:
115             self.stats_provider.count("storage_server." + name, delta)
116
117     def add_latency(self, category, latency):
118         a = self.latencies[category]
119         a.append(latency)
120         if len(a) > 1000:
121             self.latencies[category] = a[-1000:]
122
123     def get_latencies(self):
124         """Return a dict, indexed by category, that contains a dict of
125         latency numbers for each category. If there are sufficient samples
126         for unambiguous interpretation, each dict will contain the
127         following keys: mean, 01_0_percentile, 10_0_percentile,
128         50_0_percentile (median), 90_0_percentile, 95_0_percentile,
129         99_0_percentile, 99_9_percentile.  If there are insufficient
130         samples for a given percentile to be interpreted unambiguously
131         that percentile will be reported as None. If no samples have been
132         collected for the given category, then that category name will
133         not be present in the return value. """
134         # note that Amazon's Dynamo paper says they use 99.9% percentile.
135         output = {}
136         for category in self.latencies:
137             if not self.latencies[category]:
138                 continue
139             stats = {}
140             samples = self.latencies[category][:]
141             count = len(samples)
142             stats["samplesize"] = count
143             samples.sort()
144             if count > 1:
145                 stats["mean"] = sum(samples) / count
146             else:
147                 stats["mean"] = None
148
149             orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
150                              (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
151                              (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
152                              (0.999, "99_9_percentile", 1000)]
153
154             for percentile, percentilestring, minnumtoobserve in orderstatlist:
155                 if count >= minnumtoobserve:
156                     stats[percentilestring] = samples[int(percentile*count)]
157                 else:
158                     stats[percentilestring] = None
159
160             output[category] = stats
161         return output
162
163     def log(self, *args, **kwargs):
164         if "facility" not in kwargs:
165             kwargs["facility"] = "tahoe.storage"
166         return log.msg(*args, **kwargs)
167
168     def _clean_incomplete(self):
169         fileutil.rm_dir(self.incomingdir)
170
171     def get_stats(self):
172         # remember: RIStatsProvider requires that our return dict
173         # contains numeric values.
174         stats = { 'storage_server.allocated': self.allocated_size(), }
175         stats['storage_server.reserved_space'] = self.reserved_space
176         for category,ld in self.get_latencies().items():
177             for name,v in ld.items():
178                 stats['storage_server.latencies.%s.%s' % (category, name)] = v
179
180         try:
181             disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
182             writeable = disk['avail'] > 0
183
184             # spacetime predictors should use disk_avail / (d(disk_used)/dt)
185             stats['storage_server.disk_total'] = disk['total']
186             stats['storage_server.disk_used'] = disk['used']
187             stats['storage_server.disk_free_for_root'] = disk['free_for_root']
188             stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
189             stats['storage_server.disk_avail'] = disk['avail']
190         except AttributeError:
191             writeable = True
192         except EnvironmentError:
193             log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
194             writeable = False
195
196         if self.readonly_storage:
197             stats['storage_server.disk_avail'] = 0
198             writeable = False
199
200         stats['storage_server.accepting_immutable_shares'] = int(writeable)
201         s = self.bucket_counter.get_state()
202         bucket_count = s.get("last-complete-bucket-count")
203         if bucket_count:
204             stats['storage_server.total_bucket_count'] = bucket_count
205         return stats
206
207     def get_available_space(self):
208         """Returns available space for share storage in bytes, or None if no
209         API to get this information is available."""
210
211         if self.readonly_storage:
212             return 0
213         return fileutil.get_available_space(self.sharedir, self.reserved_space)
214
215     def allocated_size(self):
216         space = 0
217         for bw in self._active_writers:
218             space += bw.allocated_size()
219         return space
220
221     def remote_get_version(self):
222         remaining_space = self.get_available_space()
223         if remaining_space is None:
224             # We're on a platform that has no API to get disk stats.
225             remaining_space = 2**64
226
227         version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
228                     { "maximum-immutable-share-size": remaining_space,
229                       "maximum-mutable-share-size": MAX_MUTABLE_SHARE_SIZE,
230                       "tolerates-immutable-read-overrun": True,
231                       "delete-mutable-shares-with-zero-length-writev": True,
232                       "fills-holes-with-zero-bytes": True,
233                       "prevents-read-past-end-of-share-data": True,
234                       },
235                     "application-version": str(allmydata.__full_version__),
236                     }
237         return version
238
239     def remote_allocate_buckets(self, storage_index,
240                                 renew_secret, cancel_secret,
241                                 sharenums, allocated_size,
242                                 canary, owner_num=0):
243         # owner_num is not for clients to set, but rather it should be
244         # curried into the PersonalStorageServer instance that is dedicated
245         # to a particular owner.
246         start = time.time()
247         self.count("allocate")
248         alreadygot = set()
249         bucketwriters = {} # k: shnum, v: BucketWriter
250         si_dir = storage_index_to_dir(storage_index)
251         si_s = si_b2a(storage_index)
252
253         log.msg("storage: allocate_buckets %s" % si_s)
254
255         # in this implementation, the lease information (including secrets)
256         # goes into the share files themselves. It could also be put into a
257         # separate database. Note that the lease should not be added until
258         # the BucketWriter has been closed.
259         expire_time = time.time() + 31*24*60*60
260         lease_info = LeaseInfo(owner_num,
261                                renew_secret, cancel_secret,
262                                expire_time, self.my_nodeid)
263
264         max_space_per_bucket = allocated_size
265
266         remaining_space = self.get_available_space()
267         limited = remaining_space is not None
268         if limited:
269             # this is a bit conservative, since some of this allocated_size()
270             # has already been written to disk, where it will show up in
271             # get_available_space.
272             remaining_space -= self.allocated_size()
273         # self.readonly_storage causes remaining_space <= 0
274
275         # fill alreadygot with all shares that we have, not just the ones
276         # they asked about: this will save them a lot of work. Add or update
277         # leases for all of them: if they want us to hold shares for this
278         # file, they'll want us to hold leases for this file.
279         for (shnum, fn) in self._get_bucket_shares(storage_index):
280             alreadygot.add(shnum)
281             sf = ShareFile(fn)
282             sf.add_or_renew_lease(lease_info)
283
284         for shnum in sharenums:
285             incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
286             finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
287             if os.path.exists(finalhome):
288                 # great! we already have it. easy.
289                 pass
290             elif os.path.exists(incominghome):
291                 # Note that we don't create BucketWriters for shnums that
292                 # have a partial share (in incoming/), so if a second upload
293                 # occurs while the first is still in progress, the second
294                 # uploader will use different storage servers.
295                 pass
296             elif (not limited) or (remaining_space >= max_space_per_bucket):
297                 # ok! we need to create the new share file.
298                 bw = BucketWriter(self, incominghome, finalhome,
299                                   max_space_per_bucket, lease_info, canary)
300                 if self.no_storage:
301                     bw.throw_out_all_data = True
302                 bucketwriters[shnum] = bw
303                 self._active_writers[bw] = 1
304                 if limited:
305                     remaining_space -= max_space_per_bucket
306             else:
307                 # bummer! not enough space to accept this bucket
308                 pass
309
310         if bucketwriters:
311             fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
312
313         self.add_latency("allocate", time.time() - start)
314         return alreadygot, bucketwriters
315
316     def _iter_share_files(self, storage_index):
317         for shnum, filename in self._get_bucket_shares(storage_index):
318             f = open(filename, 'rb')
319             header = f.read(32)
320             f.close()
321             if header[:32] == MutableShareFile.MAGIC:
322                 sf = MutableShareFile(filename, self)
323                 # note: if the share has been migrated, the renew_lease()
324                 # call will throw an exception, with information to help the
325                 # client update the lease.
326             elif header[:4] == struct.pack(">L", 1):
327                 sf = ShareFile(filename)
328             else:
329                 continue # non-sharefile
330             yield sf
331
332     def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
333                          owner_num=1):
334         start = time.time()
335         self.count("add-lease")
336         new_expire_time = time.time() + 31*24*60*60
337         lease_info = LeaseInfo(owner_num,
338                                renew_secret, cancel_secret,
339                                new_expire_time, self.my_nodeid)
340         for sf in self._iter_share_files(storage_index):
341             sf.add_or_renew_lease(lease_info)
342         self.add_latency("add-lease", time.time() - start)
343         return None
344
345     def remote_renew_lease(self, storage_index, renew_secret):
346         start = time.time()
347         self.count("renew")
348         new_expire_time = time.time() + 31*24*60*60
349         found_buckets = False
350         for sf in self._iter_share_files(storage_index):
351             found_buckets = True
352             sf.renew_lease(renew_secret, new_expire_time)
353         self.add_latency("renew", time.time() - start)
354         if not found_buckets:
355             raise IndexError("no such lease to renew")
356
357     def bucket_writer_closed(self, bw, consumed_size):
358         if self.stats_provider:
359             self.stats_provider.count('storage_server.bytes_added', consumed_size)
360         del self._active_writers[bw]
361
362     def _get_bucket_shares(self, storage_index):
363         """Return a list of (shnum, pathname) tuples for files that hold
364         shares for this storage_index. In each tuple, 'shnum' will always be
365         the integer form of the last component of 'pathname'."""
366         storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
367         try:
368             for f in os.listdir(storagedir):
369                 if NUM_RE.match(f):
370                     filename = os.path.join(storagedir, f)
371                     yield (int(f), filename)
372         except OSError:
373             # Commonly caused by there being no buckets at all.
374             pass
375
376     def remote_get_buckets(self, storage_index):
377         start = time.time()
378         self.count("get")
379         si_s = si_b2a(storage_index)
380         log.msg("storage: get_buckets %s" % si_s)
381         bucketreaders = {} # k: sharenum, v: BucketReader
382         for shnum, filename in self._get_bucket_shares(storage_index):
383             bucketreaders[shnum] = BucketReader(self, filename,
384                                                 storage_index, shnum)
385         self.add_latency("get", time.time() - start)
386         return bucketreaders
387
388     def get_leases(self, storage_index):
389         """Provide an iterator that yields all of the leases attached to this
390         bucket. Each lease is returned as a LeaseInfo instance.
391
392         This method is not for client use.
393         """
394
395         # since all shares get the same lease data, we just grab the leases
396         # from the first share
397         try:
398             shnum, filename = self._get_bucket_shares(storage_index).next()
399             sf = ShareFile(filename)
400             return sf.get_leases()
401         except StopIteration:
402             return iter([])
403
404     def remote_slot_testv_and_readv_and_writev(self, storage_index,
405                                                secrets,
406                                                test_and_write_vectors,
407                                                read_vector):
408         start = time.time()
409         self.count("writev")
410         si_s = si_b2a(storage_index)
411         log.msg("storage: slot_writev %s" % si_s)
412         si_dir = storage_index_to_dir(storage_index)
413         (write_enabler, renew_secret, cancel_secret) = secrets
414         # shares exist if there is a file for them
415         bucketdir = os.path.join(self.sharedir, si_dir)
416         shares = {}
417         if os.path.isdir(bucketdir):
418             for sharenum_s in os.listdir(bucketdir):
419                 try:
420                     sharenum = int(sharenum_s)
421                 except ValueError:
422                     continue
423                 filename = os.path.join(bucketdir, sharenum_s)
424                 msf = MutableShareFile(filename, self)
425                 msf.check_write_enabler(write_enabler, si_s)
426                 shares[sharenum] = msf
427         # write_enabler is good for all existing shares.
428
429         # Now evaluate test vectors.
430         testv_is_good = True
431         for sharenum in test_and_write_vectors:
432             (testv, datav, new_length) = test_and_write_vectors[sharenum]
433             if sharenum in shares:
434                 if not shares[sharenum].check_testv(testv):
435                     self.log("testv failed: [%d]: %r" % (sharenum, testv))
436                     testv_is_good = False
437                     break
438             else:
439                 # compare the vectors against an empty share, in which all
440                 # reads return empty strings.
441                 if not EmptyShare().check_testv(testv):
442                     self.log("testv failed (empty): [%d] %r" % (sharenum,
443                                                                 testv))
444                     testv_is_good = False
445                     break
446
447         # now gather the read vectors, before we do any writes
448         read_data = {}
449         for sharenum, share in shares.items():
450             read_data[sharenum] = share.readv(read_vector)
451
452         ownerid = 1 # TODO
453         expire_time = time.time() + 31*24*60*60   # one month
454         lease_info = LeaseInfo(ownerid,
455                                renew_secret, cancel_secret,
456                                expire_time, self.my_nodeid)
457
458         if testv_is_good:
459             # now apply the write vectors
460             for sharenum in test_and_write_vectors:
461                 (testv, datav, new_length) = test_and_write_vectors[sharenum]
462                 if new_length == 0:
463                     if sharenum in shares:
464                         shares[sharenum].unlink()
465                 else:
466                     if sharenum not in shares:
467                         # allocate a new share
468                         allocated_size = 2000 # arbitrary, really
469                         share = self._allocate_slot_share(bucketdir, secrets,
470                                                           sharenum,
471                                                           allocated_size,
472                                                           owner_num=0)
473                         shares[sharenum] = share
474                     shares[sharenum].writev(datav, new_length)
475                     # and update the lease
476                     shares[sharenum].add_or_renew_lease(lease_info)
477
478             if new_length == 0:
479                 # delete empty bucket directories
480                 if not os.listdir(bucketdir):
481                     os.rmdir(bucketdir)
482
483
484         # all done
485         self.add_latency("writev", time.time() - start)
486         return (testv_is_good, read_data)
487
488     def _allocate_slot_share(self, bucketdir, secrets, sharenum,
489                              allocated_size, owner_num=0):
490         (write_enabler, renew_secret, cancel_secret) = secrets
491         my_nodeid = self.my_nodeid
492         fileutil.make_dirs(bucketdir)
493         filename = os.path.join(bucketdir, "%d" % sharenum)
494         share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
495                                          self)
496         return share
497
498     def remote_slot_readv(self, storage_index, shares, readv):
499         start = time.time()
500         self.count("readv")
501         si_s = si_b2a(storage_index)
502         lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
503                      facility="tahoe.storage", level=log.OPERATIONAL)
504         si_dir = storage_index_to_dir(storage_index)
505         # shares exist if there is a file for them
506         bucketdir = os.path.join(self.sharedir, si_dir)
507         if not os.path.isdir(bucketdir):
508             self.add_latency("readv", time.time() - start)
509             return {}
510         datavs = {}
511         for sharenum_s in os.listdir(bucketdir):
512             try:
513                 sharenum = int(sharenum_s)
514             except ValueError:
515                 continue
516             if sharenum in shares or not shares:
517                 filename = os.path.join(bucketdir, sharenum_s)
518                 msf = MutableShareFile(filename, self)
519                 datavs[sharenum] = msf.readv(readv)
520         log.msg("returning shares %s" % (datavs.keys(),),
521                 facility="tahoe.storage", level=log.NOISY, parent=lp)
522         self.add_latency("readv", time.time() - start)
523         return datavs
524
525     def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
526                                     reason):
527         fileutil.make_dirs(self.corruption_advisory_dir)
528         now = time_format.iso_utc(sep="T")
529         si_s = si_b2a(storage_index)
530         # windows can't handle colons in the filename
531         fn = os.path.join(self.corruption_advisory_dir,
532                           "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
533         f = open(fn, "w")
534         f.write("report: Share Corruption\n")
535         f.write("type: %s\n" % share_type)
536         f.write("storage_index: %s\n" % si_s)
537         f.write("share_number: %d\n" % shnum)
538         f.write("\n")
539         f.write(reason)
540         f.write("\n")
541         f.close()
542         log.msg(format=("client claims corruption in (%(share_type)s) " +
543                         "%(si)s-%(shnum)d: %(reason)s"),
544                 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
545                 level=log.SCARY, umid="SGx2fA")
546         return None