]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/server.py
8350e813c11c709af7dd6d97aab355c76e6d23c9
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / server.py
1 import os, re, weakref, struct, time
2
3 from foolscap.api import Referenceable
4 from twisted.application import service
5
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
10
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15      create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18 from allmydata.storage.expirer import LeaseCheckingCrawler
19
20 # storage/
21 # storage/shares/incoming
22 #   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23 #   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24 # storage/shares/$START/$STORAGEINDEX
25 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
26
27 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
28 # base-32 chars).
29
30 # $SHARENUM matches this regex:
31 NUM_RE=re.compile("^[0-9]+$")
32
33
34
35 class StorageServer(service.MultiService, Referenceable):
36     implements(RIStorageServer, IStatsProducer)
37     name = 'storage'
38     LeaseCheckerClass = LeaseCheckingCrawler
39
40     def __init__(self, storedir, nodeid, reserved_space=0,
41                  discard_storage=False, readonly_storage=False,
42                  stats_provider=None,
43                  expiration_enabled=False,
44                  expiration_mode="age",
45                  expiration_override_lease_duration=None,
46                  expiration_cutoff_date=None,
47                  expiration_sharetypes=("mutable", "immutable")):
48         service.MultiService.__init__(self)
49         assert isinstance(nodeid, str)
50         assert len(nodeid) == 20
51         self.my_nodeid = nodeid
52         self.storedir = storedir
53         sharedir = os.path.join(storedir, "shares")
54         fileutil.make_dirs(sharedir)
55         self.sharedir = sharedir
56         # we don't actually create the corruption-advisory dir until necessary
57         self.corruption_advisory_dir = os.path.join(storedir,
58                                                     "corruption-advisories")
59         self.reserved_space = int(reserved_space)
60         self.no_storage = discard_storage
61         self.readonly_storage = readonly_storage
62         self.stats_provider = stats_provider
63         if self.stats_provider:
64             self.stats_provider.register_producer(self)
65         self.incomingdir = os.path.join(sharedir, 'incoming')
66         self._clean_incomplete()
67         fileutil.make_dirs(self.incomingdir)
68         self._active_writers = weakref.WeakKeyDictionary()
69         log.msg("StorageServer created", facility="tahoe.storage")
70
71         if reserved_space:
72             if self.get_available_space() is None:
73                 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
74                         umin="0wZ27w", level=log.UNUSUAL)
75
76         self.latencies = {"allocate": [], # immutable
77                           "write": [],
78                           "close": [],
79                           "read": [],
80                           "get": [],
81                           "writev": [], # mutable
82                           "readv": [],
83                           "add-lease": [], # both
84                           "renew": [],
85                           "cancel": [],
86                           }
87         self.add_bucket_counter()
88
89         statefile = os.path.join(self.storedir, "lease_checker.state")
90         historyfile = os.path.join(self.storedir, "lease_checker.history")
91         klass = self.LeaseCheckerClass
92         self.lease_checker = klass(self, statefile, historyfile,
93                                    expiration_enabled, expiration_mode,
94                                    expiration_override_lease_duration,
95                                    expiration_cutoff_date,
96                                    expiration_sharetypes)
97         self.lease_checker.setServiceParent(self)
98
99     def __repr__(self):
100         return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
101
102     def add_bucket_counter(self):
103         statefile = os.path.join(self.storedir, "bucket_counter.state")
104         self.bucket_counter = BucketCountingCrawler(self, statefile)
105         self.bucket_counter.setServiceParent(self)
106
107     def count(self, name, delta=1):
108         if self.stats_provider:
109             self.stats_provider.count("storage_server." + name, delta)
110
111     def add_latency(self, category, latency):
112         a = self.latencies[category]
113         a.append(latency)
114         if len(a) > 1000:
115             self.latencies[category] = a[-1000:]
116
117     def get_latencies(self):
118         """Return a dict, indexed by category, that contains a dict of
119         latency numbers for each category. If there are sufficient samples
120         for unambiguous interpretation, each dict will contain the
121         following keys: mean, 01_0_percentile, 10_0_percentile,
122         50_0_percentile (median), 90_0_percentile, 95_0_percentile,
123         99_0_percentile, 99_9_percentile.  If there are insufficient
124         samples for a given percentile to be interpreted unambiguously
125         that percentile will be reported as None. If no samples have been
126         collected for the given category, then that category name will
127         not be present in the return value. """
128         # note that Amazon's Dynamo paper says they use 99.9% percentile.
129         output = {}
130         for category in self.latencies:
131             if not self.latencies[category]:
132                 continue
133             stats = {}
134             samples = self.latencies[category][:]
135             count = len(samples)
136             stats["samplesize"] = count
137             samples.sort()
138             if count > 1:
139                 stats["mean"] = sum(samples) / count
140             else:
141                 stats["mean"] = None
142
143             orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
144                              (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
145                              (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
146                              (0.999, "99_9_percentile", 1000)]
147
148             for percentile, percentilestring, minnumtoobserve in orderstatlist:
149                 if count >= minnumtoobserve:
150                     stats[percentilestring] = samples[int(percentile*count)]
151                 else:
152                     stats[percentilestring] = None
153
154             output[category] = stats
155         return output
156
157     def log(self, *args, **kwargs):
158         if "facility" not in kwargs:
159             kwargs["facility"] = "tahoe.storage"
160         return log.msg(*args, **kwargs)
161
162     def _clean_incomplete(self):
163         fileutil.rm_dir(self.incomingdir)
164
165     def get_stats(self):
166         # remember: RIStatsProvider requires that our return dict
167         # contains numeric values.
168         stats = { 'storage_server.allocated': self.allocated_size(), }
169         stats['storage_server.reserved_space'] = self.reserved_space
170         for category,ld in self.get_latencies().items():
171             for name,v in ld.items():
172                 stats['storage_server.latencies.%s.%s' % (category, name)] = v
173
174         try:
175             disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
176             writeable = disk['avail'] > 0
177
178             # spacetime predictors should use disk_avail / (d(disk_used)/dt)
179             stats['storage_server.disk_total'] = disk['total']
180             stats['storage_server.disk_used'] = disk['used']
181             stats['storage_server.disk_free_for_root'] = disk['free_for_root']
182             stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
183             stats['storage_server.disk_avail'] = disk['avail']
184         except AttributeError:
185             writeable = True
186         except EnvironmentError:
187             log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
188             writeable = False
189
190         if self.readonly_storage:
191             stats['storage_server.disk_avail'] = 0
192             writeable = False
193
194         stats['storage_server.accepting_immutable_shares'] = int(writeable)
195         s = self.bucket_counter.get_state()
196         bucket_count = s.get("last-complete-bucket-count")
197         if bucket_count:
198             stats['storage_server.total_bucket_count'] = bucket_count
199         return stats
200
201     def get_available_space(self):
202         """Returns available space for share storage in bytes, or None if no
203         API to get this information is available."""
204
205         if self.readonly_storage:
206             return 0
207         return fileutil.get_available_space(self.sharedir, self.reserved_space)
208
209     def allocated_size(self):
210         space = 0
211         for bw in self._active_writers:
212             space += bw.allocated_size()
213         return space
214
215     def remote_get_version(self):
216         remaining_space = self.get_available_space()
217         if remaining_space is None:
218             # We're on a platform that has no API to get disk stats.
219             remaining_space = 2**64
220
221         version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
222                     { "maximum-immutable-share-size": remaining_space,
223                       "tolerates-immutable-read-overrun": True,
224                       "delete-mutable-shares-with-zero-length-writev": True,
225                       },
226                     "application-version": str(allmydata.__full_version__),
227                     }
228         return version
229
230     def remote_allocate_buckets(self, storage_index,
231                                 renew_secret, cancel_secret,
232                                 sharenums, allocated_size,
233                                 canary, owner_num=0):
234         # owner_num is not for clients to set, but rather it should be
235         # curried into the PersonalStorageServer instance that is dedicated
236         # to a particular owner.
237         start = time.time()
238         self.count("allocate")
239         alreadygot = set()
240         bucketwriters = {} # k: shnum, v: BucketWriter
241         si_dir = storage_index_to_dir(storage_index)
242         si_s = si_b2a(storage_index)
243
244         log.msg("storage: allocate_buckets %s" % si_s)
245
246         # in this implementation, the lease information (including secrets)
247         # goes into the share files themselves. It could also be put into a
248         # separate database. Note that the lease should not be added until
249         # the BucketWriter has been closed.
250         expire_time = time.time() + 31*24*60*60
251         lease_info = LeaseInfo(owner_num,
252                                renew_secret, cancel_secret,
253                                expire_time, self.my_nodeid)
254
255         max_space_per_bucket = allocated_size
256
257         remaining_space = self.get_available_space()
258         limited = remaining_space is not None
259         if limited:
260             # this is a bit conservative, since some of this allocated_size()
261             # has already been written to disk, where it will show up in
262             # get_available_space.
263             remaining_space -= self.allocated_size()
264         # self.readonly_storage causes remaining_space <= 0
265
266         # fill alreadygot with all shares that we have, not just the ones
267         # they asked about: this will save them a lot of work. Add or update
268         # leases for all of them: if they want us to hold shares for this
269         # file, they'll want us to hold leases for this file.
270         for (shnum, fn) in self._get_bucket_shares(storage_index):
271             alreadygot.add(shnum)
272             sf = ShareFile(fn)
273             sf.add_or_renew_lease(lease_info)
274
275         for shnum in sharenums:
276             incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
277             finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
278             if os.path.exists(finalhome):
279                 # great! we already have it. easy.
280                 pass
281             elif os.path.exists(incominghome):
282                 # Note that we don't create BucketWriters for shnums that
283                 # have a partial share (in incoming/), so if a second upload
284                 # occurs while the first is still in progress, the second
285                 # uploader will use different storage servers.
286                 pass
287             elif (not limited) or (remaining_space >= max_space_per_bucket):
288                 # ok! we need to create the new share file.
289                 bw = BucketWriter(self, incominghome, finalhome,
290                                   max_space_per_bucket, lease_info, canary)
291                 if self.no_storage:
292                     bw.throw_out_all_data = True
293                 bucketwriters[shnum] = bw
294                 self._active_writers[bw] = 1
295                 if limited:
296                     remaining_space -= max_space_per_bucket
297             else:
298                 # bummer! not enough space to accept this bucket
299                 pass
300
301         if bucketwriters:
302             fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
303
304         self.add_latency("allocate", time.time() - start)
305         return alreadygot, bucketwriters
306
307     def _iter_share_files(self, storage_index):
308         for shnum, filename in self._get_bucket_shares(storage_index):
309             f = open(filename, 'rb')
310             header = f.read(32)
311             f.close()
312             if header[:32] == MutableShareFile.MAGIC:
313                 sf = MutableShareFile(filename, self)
314                 # note: if the share has been migrated, the renew_lease()
315                 # call will throw an exception, with information to help the
316                 # client update the lease.
317             elif header[:4] == struct.pack(">L", 1):
318                 sf = ShareFile(filename)
319             else:
320                 continue # non-sharefile
321             yield sf
322
323     def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
324                          owner_num=1):
325         start = time.time()
326         self.count("add-lease")
327         new_expire_time = time.time() + 31*24*60*60
328         lease_info = LeaseInfo(owner_num,
329                                renew_secret, cancel_secret,
330                                new_expire_time, self.my_nodeid)
331         for sf in self._iter_share_files(storage_index):
332             sf.add_or_renew_lease(lease_info)
333         self.add_latency("add-lease", time.time() - start)
334         return None
335
336     def remote_renew_lease(self, storage_index, renew_secret):
337         start = time.time()
338         self.count("renew")
339         new_expire_time = time.time() + 31*24*60*60
340         found_buckets = False
341         for sf in self._iter_share_files(storage_index):
342             found_buckets = True
343             sf.renew_lease(renew_secret, new_expire_time)
344         self.add_latency("renew", time.time() - start)
345         if not found_buckets:
346             raise IndexError("no such lease to renew")
347
348     def bucket_writer_closed(self, bw, consumed_size):
349         if self.stats_provider:
350             self.stats_provider.count('storage_server.bytes_added', consumed_size)
351         del self._active_writers[bw]
352
353     def _get_bucket_shares(self, storage_index):
354         """Return a list of (shnum, pathname) tuples for files that hold
355         shares for this storage_index. In each tuple, 'shnum' will always be
356         the integer form of the last component of 'pathname'."""
357         storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
358         try:
359             for f in os.listdir(storagedir):
360                 if NUM_RE.match(f):
361                     filename = os.path.join(storagedir, f)
362                     yield (int(f), filename)
363         except OSError:
364             # Commonly caused by there being no buckets at all.
365             pass
366
367     def remote_get_buckets(self, storage_index):
368         start = time.time()
369         self.count("get")
370         si_s = si_b2a(storage_index)
371         log.msg("storage: get_buckets %s" % si_s)
372         bucketreaders = {} # k: sharenum, v: BucketReader
373         for shnum, filename in self._get_bucket_shares(storage_index):
374             bucketreaders[shnum] = BucketReader(self, filename,
375                                                 storage_index, shnum)
376         self.add_latency("get", time.time() - start)
377         return bucketreaders
378
379     def get_leases(self, storage_index):
380         """Provide an iterator that yields all of the leases attached to this
381         bucket. Each lease is returned as a LeaseInfo instance.
382
383         This method is not for client use.
384         """
385
386         # since all shares get the same lease data, we just grab the leases
387         # from the first share
388         try:
389             shnum, filename = self._get_bucket_shares(storage_index).next()
390             sf = ShareFile(filename)
391             return sf.get_leases()
392         except StopIteration:
393             return iter([])
394
395     def remote_slot_testv_and_readv_and_writev(self, storage_index,
396                                                secrets,
397                                                test_and_write_vectors,
398                                                read_vector):
399         start = time.time()
400         self.count("writev")
401         si_s = si_b2a(storage_index)
402         log.msg("storage: slot_writev %s" % si_s)
403         si_dir = storage_index_to_dir(storage_index)
404         (write_enabler, renew_secret, cancel_secret) = secrets
405         # shares exist if there is a file for them
406         bucketdir = os.path.join(self.sharedir, si_dir)
407         shares = {}
408         if os.path.isdir(bucketdir):
409             for sharenum_s in os.listdir(bucketdir):
410                 try:
411                     sharenum = int(sharenum_s)
412                 except ValueError:
413                     continue
414                 filename = os.path.join(bucketdir, sharenum_s)
415                 msf = MutableShareFile(filename, self)
416                 msf.check_write_enabler(write_enabler, si_s)
417                 shares[sharenum] = msf
418         # write_enabler is good for all existing shares.
419
420         # Now evaluate test vectors.
421         testv_is_good = True
422         for sharenum in test_and_write_vectors:
423             (testv, datav, new_length) = test_and_write_vectors[sharenum]
424             if sharenum in shares:
425                 if not shares[sharenum].check_testv(testv):
426                     self.log("testv failed: [%d]: %r" % (sharenum, testv))
427                     testv_is_good = False
428                     break
429             else:
430                 # compare the vectors against an empty share, in which all
431                 # reads return empty strings.
432                 if not EmptyShare().check_testv(testv):
433                     self.log("testv failed (empty): [%d] %r" % (sharenum,
434                                                                 testv))
435                     testv_is_good = False
436                     break
437
438         # now gather the read vectors, before we do any writes
439         read_data = {}
440         for sharenum, share in shares.items():
441             read_data[sharenum] = share.readv(read_vector)
442
443         ownerid = 1 # TODO
444         expire_time = time.time() + 31*24*60*60   # one month
445         lease_info = LeaseInfo(ownerid,
446                                renew_secret, cancel_secret,
447                                expire_time, self.my_nodeid)
448
449         if testv_is_good:
450             # now apply the write vectors
451             for sharenum in test_and_write_vectors:
452                 (testv, datav, new_length) = test_and_write_vectors[sharenum]
453                 if new_length == 0:
454                     if sharenum in shares:
455                         shares[sharenum].unlink()
456                 else:
457                     if sharenum not in shares:
458                         # allocate a new share
459                         allocated_size = 2000 # arbitrary, really
460                         share = self._allocate_slot_share(bucketdir, secrets,
461                                                           sharenum,
462                                                           allocated_size,
463                                                           owner_num=0)
464                         shares[sharenum] = share
465                     shares[sharenum].writev(datav, new_length)
466                     # and update the lease
467                     shares[sharenum].add_or_renew_lease(lease_info)
468
469             if new_length == 0:
470                 # delete empty bucket directories
471                 if not os.listdir(bucketdir):
472                     os.rmdir(bucketdir)
473
474
475         # all done
476         self.add_latency("writev", time.time() - start)
477         return (testv_is_good, read_data)
478
479     def _allocate_slot_share(self, bucketdir, secrets, sharenum,
480                              allocated_size, owner_num=0):
481         (write_enabler, renew_secret, cancel_secret) = secrets
482         my_nodeid = self.my_nodeid
483         fileutil.make_dirs(bucketdir)
484         filename = os.path.join(bucketdir, "%d" % sharenum)
485         share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
486                                          self)
487         return share
488
489     def remote_slot_readv(self, storage_index, shares, readv):
490         start = time.time()
491         self.count("readv")
492         si_s = si_b2a(storage_index)
493         lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
494                      facility="tahoe.storage", level=log.OPERATIONAL)
495         si_dir = storage_index_to_dir(storage_index)
496         # shares exist if there is a file for them
497         bucketdir = os.path.join(self.sharedir, si_dir)
498         if not os.path.isdir(bucketdir):
499             self.add_latency("readv", time.time() - start)
500             return {}
501         datavs = {}
502         for sharenum_s in os.listdir(bucketdir):
503             try:
504                 sharenum = int(sharenum_s)
505             except ValueError:
506                 continue
507             if sharenum in shares or not shares:
508                 filename = os.path.join(bucketdir, sharenum_s)
509                 msf = MutableShareFile(filename, self)
510                 datavs[sharenum] = msf.readv(readv)
511         log.msg("returning shares %s" % (datavs.keys(),),
512                 facility="tahoe.storage", level=log.NOISY, parent=lp)
513         self.add_latency("readv", time.time() - start)
514         return datavs
515
516     def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
517                                     reason):
518         fileutil.make_dirs(self.corruption_advisory_dir)
519         now = time_format.iso_utc(sep="T")
520         si_s = si_b2a(storage_index)
521         # windows can't handle colons in the filename
522         fn = os.path.join(self.corruption_advisory_dir,
523                           "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
524         f = open(fn, "w")
525         f.write("report: Share Corruption\n")
526         f.write("type: %s\n" % share_type)
527         f.write("storage_index: %s\n" % si_s)
528         f.write("share_number: %d\n" % shnum)
529         f.write("\n")
530         f.write(reason)
531         f.write("\n")
532         f.close()
533         log.msg(format=("client claims corruption in (%(share_type)s) " +
534                         "%(si)s-%(shnum)d: %(reason)s"),
535                 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
536                 level=log.SCARY, umid="SGx2fA")
537         return None