]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/server.py
storage: use fileutil's version of get_disk_stats() and get_available_space(), use...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / server.py
1 import os, re, weakref, struct, time
2
3 from foolscap.api import Referenceable
4 from twisted.application import service
5
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
10
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15      create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18 from allmydata.storage.expirer import LeaseCheckingCrawler
19
20 # storage/
21 # storage/shares/incoming
22 #   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23 #   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24 # storage/shares/$START/$STORAGEINDEX
25 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
26
27 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
28 # base-32 chars).
29
30 # $SHARENUM matches this regex:
31 NUM_RE=re.compile("^[0-9]+$")
32
33
34
35 class StorageServer(service.MultiService, Referenceable):
36     implements(RIStorageServer, IStatsProducer)
37     name = 'storage'
38     LeaseCheckerClass = LeaseCheckingCrawler
39
40     def __init__(self, storedir, nodeid, reserved_space=0,
41                  discard_storage=False, readonly_storage=False,
42                  stats_provider=None,
43                  expiration_enabled=False,
44                  expiration_mode="age",
45                  expiration_override_lease_duration=None,
46                  expiration_cutoff_date=None,
47                  expiration_sharetypes=("mutable", "immutable")):
48         service.MultiService.__init__(self)
49         assert isinstance(nodeid, str)
50         assert len(nodeid) == 20
51         self.my_nodeid = nodeid
52         self.storedir = storedir
53         sharedir = os.path.join(storedir, "shares")
54         fileutil.make_dirs(sharedir)
55         self.sharedir = sharedir
56         # we don't actually create the corruption-advisory dir until necessary
57         self.corruption_advisory_dir = os.path.join(storedir,
58                                                     "corruption-advisories")
59         self.reserved_space = int(reserved_space)
60         self.no_storage = discard_storage
61         self.readonly_storage = readonly_storage
62         self.stats_provider = stats_provider
63         if self.stats_provider:
64             self.stats_provider.register_producer(self)
65         self.incomingdir = os.path.join(sharedir, 'incoming')
66         self._clean_incomplete()
67         fileutil.make_dirs(self.incomingdir)
68         self._active_writers = weakref.WeakKeyDictionary()
69         log.msg("StorageServer created", facility="tahoe.storage")
70
71         if reserved_space:
72             if self.get_available_space() is None:
73                 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
74                         umin="0wZ27w", level=log.UNUSUAL)
75
76         self.latencies = {"allocate": [], # immutable
77                           "write": [],
78                           "close": [],
79                           "read": [],
80                           "get": [],
81                           "writev": [], # mutable
82                           "readv": [],
83                           "add-lease": [], # both
84                           "renew": [],
85                           "cancel": [],
86                           }
87         self.add_bucket_counter()
88
89         statefile = os.path.join(self.storedir, "lease_checker.state")
90         historyfile = os.path.join(self.storedir, "lease_checker.history")
91         klass = self.LeaseCheckerClass
92         self.lease_checker = klass(self, statefile, historyfile,
93                                    expiration_enabled, expiration_mode,
94                                    expiration_override_lease_duration,
95                                    expiration_cutoff_date,
96                                    expiration_sharetypes)
97         self.lease_checker.setServiceParent(self)
98
99     def __repr__(self):
100         return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
101
102     def add_bucket_counter(self):
103         statefile = os.path.join(self.storedir, "bucket_counter.state")
104         self.bucket_counter = BucketCountingCrawler(self, statefile)
105         self.bucket_counter.setServiceParent(self)
106
107     def count(self, name, delta=1):
108         if self.stats_provider:
109             self.stats_provider.count("storage_server." + name, delta)
110
111     def add_latency(self, category, latency):
112         a = self.latencies[category]
113         a.append(latency)
114         if len(a) > 1000:
115             self.latencies[category] = a[-1000:]
116
117     def get_latencies(self):
118         """Return a dict, indexed by category, that contains a dict of
119         latency numbers for each category. Each dict will contain the
120         following keys: mean, 01_0_percentile, 10_0_percentile,
121         50_0_percentile (median), 90_0_percentile, 95_0_percentile,
122         99_0_percentile, 99_9_percentile. If no samples have been collected
123         for the given category, then that category name will not be present
124         in the return value."""
125         # note that Amazon's Dynamo paper says they use 99.9% percentile.
126         output = {}
127         for category in self.latencies:
128             if not self.latencies[category]:
129                 continue
130             stats = {}
131             samples = self.latencies[category][:]
132             samples.sort()
133             count = len(samples)
134             stats["mean"] = sum(samples) / count
135             stats["01_0_percentile"] = samples[int(0.01 * count)]
136             stats["10_0_percentile"] = samples[int(0.1 * count)]
137             stats["50_0_percentile"] = samples[int(0.5 * count)]
138             stats["90_0_percentile"] = samples[int(0.9 * count)]
139             stats["95_0_percentile"] = samples[int(0.95 * count)]
140             stats["99_0_percentile"] = samples[int(0.99 * count)]
141             stats["99_9_percentile"] = samples[int(0.999 * count)]
142             output[category] = stats
143         return output
144
145     def log(self, *args, **kwargs):
146         if "facility" not in kwargs:
147             kwargs["facility"] = "tahoe.storage"
148         return log.msg(*args, **kwargs)
149
150     def _clean_incomplete(self):
151         fileutil.rm_dir(self.incomingdir)
152
153     def get_stats(self):
154         # remember: RIStatsProvider requires that our return dict
155         # contains numeric values.
156         stats = { 'storage_server.allocated': self.allocated_size(), }
157         stats['storage_server.reserved_space'] = self.reserved_space
158         for category,ld in self.get_latencies().items():
159             for name,v in ld.items():
160                 stats['storage_server.latencies.%s.%s' % (category, name)] = v
161
162         try:
163             disk = fileutil.get_disk_stats(self.storedir, self.reserved_space)
164             writeable = disk['avail'] > 0
165
166             # spacetime predictors should use disk_avail / (d(disk_used)/dt)
167             stats['storage_server.disk_total'] = disk['total']
168             stats['storage_server.disk_used'] = disk['used']
169             stats['storage_server.disk_free_for_root'] = disk['free_for_root']
170             stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
171             stats['storage_server.disk_avail'] = disk['avail']
172         except AttributeError:
173             writeable = True
174         except EnvironmentError:
175             log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
176             writeable = False
177
178         if self.readonly_storage:
179             stats['storage_server.disk_avail'] = 0
180             writeable = False
181
182         stats['storage_server.accepting_immutable_shares'] = int(writeable)
183         s = self.bucket_counter.get_state()
184         bucket_count = s.get("last-complete-bucket-count")
185         if bucket_count:
186             stats['storage_server.total_bucket_count'] = bucket_count
187         return stats
188
189     def get_available_space(self):
190         """Returns available space for share storage in bytes, or None if no
191         API to get this information is available."""
192
193         if self.readonly_storage:
194             return 0
195         return fileutil.get_available_space(self.storedir, self.reserved_space)
196
197     def allocated_size(self):
198         space = 0
199         for bw in self._active_writers:
200             space += bw.allocated_size()
201         return space
202
203     def remote_get_version(self):
204         remaining_space = self.get_available_space()
205         if remaining_space is None:
206             # We're on a platform that has no API to get disk stats.
207             remaining_space = 2**64
208
209         version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
210                     { "maximum-immutable-share-size": remaining_space,
211                       "tolerates-immutable-read-overrun": True,
212                       "delete-mutable-shares-with-zero-length-writev": True,
213                       },
214                     "application-version": str(allmydata.__full_version__),
215                     }
216         return version
217
218     def remote_allocate_buckets(self, storage_index,
219                                 renew_secret, cancel_secret,
220                                 sharenums, allocated_size,
221                                 canary, owner_num=0):
222         # owner_num is not for clients to set, but rather it should be
223         # curried into the PersonalStorageServer instance that is dedicated
224         # to a particular owner.
225         start = time.time()
226         self.count("allocate")
227         alreadygot = set()
228         bucketwriters = {} # k: shnum, v: BucketWriter
229         si_dir = storage_index_to_dir(storage_index)
230         si_s = si_b2a(storage_index)
231
232         log.msg("storage: allocate_buckets %s" % si_s)
233
234         # in this implementation, the lease information (including secrets)
235         # goes into the share files themselves. It could also be put into a
236         # separate database. Note that the lease should not be added until
237         # the BucketWriter has been closed.
238         expire_time = time.time() + 31*24*60*60
239         lease_info = LeaseInfo(owner_num,
240                                renew_secret, cancel_secret,
241                                expire_time, self.my_nodeid)
242
243         max_space_per_bucket = allocated_size
244
245         remaining_space = self.get_available_space()
246         limited = remaining_space is not None
247         if limited:
248             # this is a bit conservative, since some of this allocated_size()
249             # has already been written to disk, where it will show up in
250             # get_available_space.
251             remaining_space -= self.allocated_size()
252         # self.readonly_storage causes remaining_space <= 0
253
254         # fill alreadygot with all shares that we have, not just the ones
255         # they asked about: this will save them a lot of work. Add or update
256         # leases for all of them: if they want us to hold shares for this
257         # file, they'll want us to hold leases for this file.
258         for (shnum, fn) in self._get_bucket_shares(storage_index):
259             alreadygot.add(shnum)
260             sf = ShareFile(fn)
261             sf.add_or_renew_lease(lease_info)
262
263         for shnum in sharenums:
264             incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
265             finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
266             if os.path.exists(finalhome):
267                 # great! we already have it. easy.
268                 pass
269             elif os.path.exists(incominghome):
270                 # Note that we don't create BucketWriters for shnums that
271                 # have a partial share (in incoming/), so if a second upload
272                 # occurs while the first is still in progress, the second
273                 # uploader will use different storage servers.
274                 pass
275             elif (not limited) or (remaining_space >= max_space_per_bucket):
276                 # ok! we need to create the new share file.
277                 bw = BucketWriter(self, incominghome, finalhome,
278                                   max_space_per_bucket, lease_info, canary)
279                 if self.no_storage:
280                     bw.throw_out_all_data = True
281                 bucketwriters[shnum] = bw
282                 self._active_writers[bw] = 1
283                 if limited:
284                     remaining_space -= max_space_per_bucket
285             else:
286                 # bummer! not enough space to accept this bucket
287                 pass
288
289         if bucketwriters:
290             fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
291
292         self.add_latency("allocate", time.time() - start)
293         return alreadygot, bucketwriters
294
295     def _iter_share_files(self, storage_index):
296         for shnum, filename in self._get_bucket_shares(storage_index):
297             f = open(filename, 'rb')
298             header = f.read(32)
299             f.close()
300             if header[:32] == MutableShareFile.MAGIC:
301                 sf = MutableShareFile(filename, self)
302                 # note: if the share has been migrated, the renew_lease()
303                 # call will throw an exception, with information to help the
304                 # client update the lease.
305             elif header[:4] == struct.pack(">L", 1):
306                 sf = ShareFile(filename)
307             else:
308                 continue # non-sharefile
309             yield sf
310
311     def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
312                          owner_num=1):
313         start = time.time()
314         self.count("add-lease")
315         new_expire_time = time.time() + 31*24*60*60
316         lease_info = LeaseInfo(owner_num,
317                                renew_secret, cancel_secret,
318                                new_expire_time, self.my_nodeid)
319         for sf in self._iter_share_files(storage_index):
320             sf.add_or_renew_lease(lease_info)
321         self.add_latency("add-lease", time.time() - start)
322         return None
323
324     def remote_renew_lease(self, storage_index, renew_secret):
325         start = time.time()
326         self.count("renew")
327         new_expire_time = time.time() + 31*24*60*60
328         found_buckets = False
329         for sf in self._iter_share_files(storage_index):
330             found_buckets = True
331             sf.renew_lease(renew_secret, new_expire_time)
332         self.add_latency("renew", time.time() - start)
333         if not found_buckets:
334             raise IndexError("no such lease to renew")
335
336     def remote_cancel_lease(self, storage_index, cancel_secret):
337         start = time.time()
338         self.count("cancel")
339
340         total_space_freed = 0
341         found_buckets = False
342         for sf in self._iter_share_files(storage_index):
343             # note: if we can't find a lease on one share, we won't bother
344             # looking in the others. Unless something broke internally
345             # (perhaps we ran out of disk space while adding a lease), the
346             # leases on all shares will be identical.
347             found_buckets = True
348             # this raises IndexError if the lease wasn't present XXXX
349             total_space_freed += sf.cancel_lease(cancel_secret)
350
351         if found_buckets:
352             storagedir = os.path.join(self.sharedir,
353                                       storage_index_to_dir(storage_index))
354             if not os.listdir(storagedir):
355                 os.rmdir(storagedir)
356
357         if self.stats_provider:
358             self.stats_provider.count('storage_server.bytes_freed',
359                                       total_space_freed)
360         self.add_latency("cancel", time.time() - start)
361         if not found_buckets:
362             raise IndexError("no such storage index")
363
364     def bucket_writer_closed(self, bw, consumed_size):
365         if self.stats_provider:
366             self.stats_provider.count('storage_server.bytes_added', consumed_size)
367         del self._active_writers[bw]
368
369     def _get_bucket_shares(self, storage_index):
370         """Return a list of (shnum, pathname) tuples for files that hold
371         shares for this storage_index. In each tuple, 'shnum' will always be
372         the integer form of the last component of 'pathname'."""
373         storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
374         try:
375             for f in os.listdir(storagedir):
376                 if NUM_RE.match(f):
377                     filename = os.path.join(storagedir, f)
378                     yield (int(f), filename)
379         except OSError:
380             # Commonly caused by there being no buckets at all.
381             pass
382
383     def remote_get_buckets(self, storage_index):
384         start = time.time()
385         self.count("get")
386         si_s = si_b2a(storage_index)
387         log.msg("storage: get_buckets %s" % si_s)
388         bucketreaders = {} # k: sharenum, v: BucketReader
389         for shnum, filename in self._get_bucket_shares(storage_index):
390             bucketreaders[shnum] = BucketReader(self, filename,
391                                                 storage_index, shnum)
392         self.add_latency("get", time.time() - start)
393         return bucketreaders
394
395     def get_leases(self, storage_index):
396         """Provide an iterator that yields all of the leases attached to this
397         bucket. Each lease is returned as a LeaseInfo instance.
398
399         This method is not for client use.
400         """
401
402         # since all shares get the same lease data, we just grab the leases
403         # from the first share
404         try:
405             shnum, filename = self._get_bucket_shares(storage_index).next()
406             sf = ShareFile(filename)
407             return sf.get_leases()
408         except StopIteration:
409             return iter([])
410
411     def remote_slot_testv_and_readv_and_writev(self, storage_index,
412                                                secrets,
413                                                test_and_write_vectors,
414                                                read_vector):
415         start = time.time()
416         self.count("writev")
417         si_s = si_b2a(storage_index)
418         log.msg("storage: slot_writev %s" % si_s)
419         si_dir = storage_index_to_dir(storage_index)
420         (write_enabler, renew_secret, cancel_secret) = secrets
421         # shares exist if there is a file for them
422         bucketdir = os.path.join(self.sharedir, si_dir)
423         shares = {}
424         if os.path.isdir(bucketdir):
425             for sharenum_s in os.listdir(bucketdir):
426                 try:
427                     sharenum = int(sharenum_s)
428                 except ValueError:
429                     continue
430                 filename = os.path.join(bucketdir, sharenum_s)
431                 msf = MutableShareFile(filename, self)
432                 msf.check_write_enabler(write_enabler, si_s)
433                 shares[sharenum] = msf
434         # write_enabler is good for all existing shares.
435
436         # Now evaluate test vectors.
437         testv_is_good = True
438         for sharenum in test_and_write_vectors:
439             (testv, datav, new_length) = test_and_write_vectors[sharenum]
440             if sharenum in shares:
441                 if not shares[sharenum].check_testv(testv):
442                     self.log("testv failed: [%d]: %r" % (sharenum, testv))
443                     testv_is_good = False
444                     break
445             else:
446                 # compare the vectors against an empty share, in which all
447                 # reads return empty strings.
448                 if not EmptyShare().check_testv(testv):
449                     self.log("testv failed (empty): [%d] %r" % (sharenum,
450                                                                 testv))
451                     testv_is_good = False
452                     break
453
454         # now gather the read vectors, before we do any writes
455         read_data = {}
456         for sharenum, share in shares.items():
457             read_data[sharenum] = share.readv(read_vector)
458
459         ownerid = 1 # TODO
460         expire_time = time.time() + 31*24*60*60   # one month
461         lease_info = LeaseInfo(ownerid,
462                                renew_secret, cancel_secret,
463                                expire_time, self.my_nodeid)
464
465         if testv_is_good:
466             # now apply the write vectors
467             for sharenum in test_and_write_vectors:
468                 (testv, datav, new_length) = test_and_write_vectors[sharenum]
469                 if new_length == 0:
470                     if sharenum in shares:
471                         shares[sharenum].unlink()
472                 else:
473                     if sharenum not in shares:
474                         # allocate a new share
475                         allocated_size = 2000 # arbitrary, really
476                         share = self._allocate_slot_share(bucketdir, secrets,
477                                                           sharenum,
478                                                           allocated_size,
479                                                           owner_num=0)
480                         shares[sharenum] = share
481                     shares[sharenum].writev(datav, new_length)
482                     # and update the lease
483                     shares[sharenum].add_or_renew_lease(lease_info)
484
485             if new_length == 0:
486                 # delete empty bucket directories
487                 if not os.listdir(bucketdir):
488                     os.rmdir(bucketdir)
489
490
491         # all done
492         self.add_latency("writev", time.time() - start)
493         return (testv_is_good, read_data)
494
495     def _allocate_slot_share(self, bucketdir, secrets, sharenum,
496                              allocated_size, owner_num=0):
497         (write_enabler, renew_secret, cancel_secret) = secrets
498         my_nodeid = self.my_nodeid
499         fileutil.make_dirs(bucketdir)
500         filename = os.path.join(bucketdir, "%d" % sharenum)
501         share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
502                                          self)
503         return share
504
505     def remote_slot_readv(self, storage_index, shares, readv):
506         start = time.time()
507         self.count("readv")
508         si_s = si_b2a(storage_index)
509         lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
510                      facility="tahoe.storage", level=log.OPERATIONAL)
511         si_dir = storage_index_to_dir(storage_index)
512         # shares exist if there is a file for them
513         bucketdir = os.path.join(self.sharedir, si_dir)
514         if not os.path.isdir(bucketdir):
515             self.add_latency("readv", time.time() - start)
516             return {}
517         datavs = {}
518         for sharenum_s in os.listdir(bucketdir):
519             try:
520                 sharenum = int(sharenum_s)
521             except ValueError:
522                 continue
523             if sharenum in shares or not shares:
524                 filename = os.path.join(bucketdir, sharenum_s)
525                 msf = MutableShareFile(filename, self)
526                 datavs[sharenum] = msf.readv(readv)
527         log.msg("returning shares %s" % (datavs.keys(),),
528                 facility="tahoe.storage", level=log.NOISY, parent=lp)
529         self.add_latency("readv", time.time() - start)
530         return datavs
531
532     def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
533                                     reason):
534         fileutil.make_dirs(self.corruption_advisory_dir)
535         now = time_format.iso_utc(sep="T")
536         si_s = si_b2a(storage_index)
537         # windows can't handle colons in the filename
538         fn = os.path.join(self.corruption_advisory_dir,
539                           "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
540         f = open(fn, "w")
541         f.write("report: Share Corruption\n")
542         f.write("type: %s\n" % share_type)
543         f.write("storage_index: %s\n" % si_s)
544         f.write("share_number: %d\n" % shnum)
545         f.write("\n")
546         f.write(reason)
547         f.write("\n")
548         f.close()
549         log.msg(format=("client claims corruption in (%(share_type)s) " +
550                         "%(si)s-%(shnum)d: %(reason)s"),
551                 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
552                 level=log.SCARY, umid="SGx2fA")
553         return None
554