]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/server.py
7dd3cb4722b3036697ce5f733acbaafd1c85c711
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / server.py
1 import os, re, weakref, struct, time
2
3 from foolscap.api import Referenceable
4 from twisted.application import service
5
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
10
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15      create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18 from allmydata.storage.expirer import LeaseCheckingCrawler
19
20 # storage/
21 # storage/shares/incoming
22 #   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23 #   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24 # storage/shares/$START/$STORAGEINDEX
25 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
26
27 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
28 # base-32 chars).
29
30 # $SHARENUM matches this regex:
31 NUM_RE=re.compile("^[0-9]+$")
32
33
34
35 class StorageServer(service.MultiService, Referenceable):
36     implements(RIStorageServer, IStatsProducer)
37     name = 'storage'
38     LeaseCheckerClass = LeaseCheckingCrawler
39
40     def __init__(self, storedir, nodeid, reserved_space=0,
41                  discard_storage=False, readonly_storage=False,
42                  stats_provider=None,
43                  expiration_enabled=False,
44                  expiration_mode="age",
45                  expiration_override_lease_duration=None,
46                  expiration_cutoff_date=None,
47                  expiration_sharetypes=("mutable", "immutable")):
48         service.MultiService.__init__(self)
49         assert isinstance(nodeid, str)
50         assert len(nodeid) == 20
51         self.my_nodeid = nodeid
52         self.storedir = storedir
53         sharedir = os.path.join(storedir, "shares")
54         fileutil.make_dirs(sharedir)
55         self.sharedir = sharedir
56         # we don't actually create the corruption-advisory dir until necessary
57         self.corruption_advisory_dir = os.path.join(storedir,
58                                                     "corruption-advisories")
59         self.reserved_space = int(reserved_space)
60         self.no_storage = discard_storage
61         self.readonly_storage = readonly_storage
62         self.stats_provider = stats_provider
63         if self.stats_provider:
64             self.stats_provider.register_producer(self)
65         self.incomingdir = os.path.join(sharedir, 'incoming')
66         self._clean_incomplete()
67         fileutil.make_dirs(self.incomingdir)
68         self._active_writers = weakref.WeakKeyDictionary()
69         log.msg("StorageServer created", facility="tahoe.storage")
70
71         if reserved_space:
72             if self.get_available_space() is None:
73                 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
74                         umin="0wZ27w", level=log.UNUSUAL)
75
76         self.latencies = {"allocate": [], # immutable
77                           "write": [],
78                           "close": [],
79                           "read": [],
80                           "get": [],
81                           "writev": [], # mutable
82                           "readv": [],
83                           "add-lease": [], # both
84                           "renew": [],
85                           "cancel": [],
86                           }
87         self.add_bucket_counter()
88
89         statefile = os.path.join(self.storedir, "lease_checker.state")
90         historyfile = os.path.join(self.storedir, "lease_checker.history")
91         klass = self.LeaseCheckerClass
92         self.lease_checker = klass(self, statefile, historyfile,
93                                    expiration_enabled, expiration_mode,
94                                    expiration_override_lease_duration,
95                                    expiration_cutoff_date,
96                                    expiration_sharetypes)
97         self.lease_checker.setServiceParent(self)
98
99     def __repr__(self):
100         return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
101
102     def add_bucket_counter(self):
103         statefile = os.path.join(self.storedir, "bucket_counter.state")
104         self.bucket_counter = BucketCountingCrawler(self, statefile)
105         self.bucket_counter.setServiceParent(self)
106
107     def count(self, name, delta=1):
108         if self.stats_provider:
109             self.stats_provider.count("storage_server." + name, delta)
110
111     def add_latency(self, category, latency):
112         a = self.latencies[category]
113         a.append(latency)
114         if len(a) > 1000:
115             self.latencies[category] = a[-1000:]
116
117     def get_latencies(self):
118         """Return a dict, indexed by category, that contains a dict of
119         latency numbers for each category. If there are sufficient samples
120         for unambiguous interpretation, each dict will contain the
121         following keys: mean, 01_0_percentile, 10_0_percentile,
122         50_0_percentile (median), 90_0_percentile, 95_0_percentile,
123         99_0_percentile, 99_9_percentile.  If there are insufficient
124         samples for a given percentile to be interpreted unambiguously
125         that percentile will be reported as None. If no samples have been
126         collected for the given category, then that category name will
127         not be present in the return value. """
128         # note that Amazon's Dynamo paper says they use 99.9% percentile.
129         output = {}
130         for category in self.latencies:
131             if not self.latencies[category]:
132                 continue
133             stats = {}
134             samples = self.latencies[category][:]
135             count = len(samples)
136             stats["samplesize"] = count
137             samples.sort()
138             if count > 1:
139                 stats["mean"] = sum(samples) / count
140             else:
141                 stats["mean"] = None
142
143             orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
144                              (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
145                              (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
146                              (0.999, "99_9_percentile", 1000)]
147
148             for percentile, percentilestring, minnumtoobserve in orderstatlist:
149                 if count >= minnumtoobserve:
150                     stats[percentilestring] = samples[int(percentile*count)]
151                 else:
152                     stats[percentilestring] = None
153
154             output[category] = stats
155         return output
156
157     def log(self, *args, **kwargs):
158         if "facility" not in kwargs:
159             kwargs["facility"] = "tahoe.storage"
160         return log.msg(*args, **kwargs)
161
162     def _clean_incomplete(self):
163         fileutil.rm_dir(self.incomingdir)
164
165     def get_stats(self):
166         # remember: RIStatsProvider requires that our return dict
167         # contains numeric values.
168         stats = { 'storage_server.allocated': self.allocated_size(), }
169         stats['storage_server.reserved_space'] = self.reserved_space
170         for category,ld in self.get_latencies().items():
171             for name,v in ld.items():
172                 stats['storage_server.latencies.%s.%s' % (category, name)] = v
173
174         try:
175             disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
176             writeable = disk['avail'] > 0
177
178             # spacetime predictors should use disk_avail / (d(disk_used)/dt)
179             stats['storage_server.disk_total'] = disk['total']
180             stats['storage_server.disk_used'] = disk['used']
181             stats['storage_server.disk_free_for_root'] = disk['free_for_root']
182             stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
183             stats['storage_server.disk_avail'] = disk['avail']
184         except AttributeError:
185             writeable = True
186         except EnvironmentError:
187             log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
188             writeable = False
189
190         if self.readonly_storage:
191             stats['storage_server.disk_avail'] = 0
192             writeable = False
193
194         stats['storage_server.accepting_immutable_shares'] = int(writeable)
195         s = self.bucket_counter.get_state()
196         bucket_count = s.get("last-complete-bucket-count")
197         if bucket_count:
198             stats['storage_server.total_bucket_count'] = bucket_count
199         return stats
200
201     def get_available_space(self):
202         """Returns available space for share storage in bytes, or None if no
203         API to get this information is available."""
204
205         if self.readonly_storage:
206             return 0
207         return fileutil.get_available_space(self.sharedir, self.reserved_space)
208
209     def allocated_size(self):
210         space = 0
211         for bw in self._active_writers:
212             space += bw.allocated_size()
213         return space
214
215     def remote_get_version(self):
216         remaining_space = self.get_available_space()
217         if remaining_space is None:
218             # We're on a platform that has no API to get disk stats.
219             remaining_space = 2**64
220
221         version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
222                     { "maximum-immutable-share-size": remaining_space,
223                       "tolerates-immutable-read-overrun": True,
224                       "delete-mutable-shares-with-zero-length-writev": True,
225                       "prevents-read-past-end-of-share-data": True,
226                       },
227                     "application-version": str(allmydata.__full_version__),
228                     }
229         return version
230
231     def remote_allocate_buckets(self, storage_index,
232                                 renew_secret, cancel_secret,
233                                 sharenums, allocated_size,
234                                 canary, owner_num=0):
235         # owner_num is not for clients to set, but rather it should be
236         # curried into the PersonalStorageServer instance that is dedicated
237         # to a particular owner.
238         start = time.time()
239         self.count("allocate")
240         alreadygot = set()
241         bucketwriters = {} # k: shnum, v: BucketWriter
242         si_dir = storage_index_to_dir(storage_index)
243         si_s = si_b2a(storage_index)
244
245         log.msg("storage: allocate_buckets %s" % si_s)
246
247         # in this implementation, the lease information (including secrets)
248         # goes into the share files themselves. It could also be put into a
249         # separate database. Note that the lease should not be added until
250         # the BucketWriter has been closed.
251         expire_time = time.time() + 31*24*60*60
252         lease_info = LeaseInfo(owner_num,
253                                renew_secret, cancel_secret,
254                                expire_time, self.my_nodeid)
255
256         max_space_per_bucket = allocated_size
257
258         remaining_space = self.get_available_space()
259         limited = remaining_space is not None
260         if limited:
261             # this is a bit conservative, since some of this allocated_size()
262             # has already been written to disk, where it will show up in
263             # get_available_space.
264             remaining_space -= self.allocated_size()
265         # self.readonly_storage causes remaining_space <= 0
266
267         # fill alreadygot with all shares that we have, not just the ones
268         # they asked about: this will save them a lot of work. Add or update
269         # leases for all of them: if they want us to hold shares for this
270         # file, they'll want us to hold leases for this file.
271         for (shnum, fn) in self._get_bucket_shares(storage_index):
272             alreadygot.add(shnum)
273             sf = ShareFile(fn)
274             sf.add_or_renew_lease(lease_info)
275
276         for shnum in sharenums:
277             incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
278             finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
279             if os.path.exists(finalhome):
280                 # great! we already have it. easy.
281                 pass
282             elif os.path.exists(incominghome):
283                 # Note that we don't create BucketWriters for shnums that
284                 # have a partial share (in incoming/), so if a second upload
285                 # occurs while the first is still in progress, the second
286                 # uploader will use different storage servers.
287                 pass
288             elif (not limited) or (remaining_space >= max_space_per_bucket):
289                 # ok! we need to create the new share file.
290                 bw = BucketWriter(self, incominghome, finalhome,
291                                   max_space_per_bucket, lease_info, canary)
292                 if self.no_storage:
293                     bw.throw_out_all_data = True
294                 bucketwriters[shnum] = bw
295                 self._active_writers[bw] = 1
296                 if limited:
297                     remaining_space -= max_space_per_bucket
298             else:
299                 # bummer! not enough space to accept this bucket
300                 pass
301
302         if bucketwriters:
303             fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
304
305         self.add_latency("allocate", time.time() - start)
306         return alreadygot, bucketwriters
307
308     def _iter_share_files(self, storage_index):
309         for shnum, filename in self._get_bucket_shares(storage_index):
310             f = open(filename, 'rb')
311             header = f.read(32)
312             f.close()
313             if header[:32] == MutableShareFile.MAGIC:
314                 sf = MutableShareFile(filename, self)
315                 # note: if the share has been migrated, the renew_lease()
316                 # call will throw an exception, with information to help the
317                 # client update the lease.
318             elif header[:4] == struct.pack(">L", 1):
319                 sf = ShareFile(filename)
320             else:
321                 continue # non-sharefile
322             yield sf
323
324     def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
325                          owner_num=1):
326         start = time.time()
327         self.count("add-lease")
328         new_expire_time = time.time() + 31*24*60*60
329         lease_info = LeaseInfo(owner_num,
330                                renew_secret, cancel_secret,
331                                new_expire_time, self.my_nodeid)
332         for sf in self._iter_share_files(storage_index):
333             sf.add_or_renew_lease(lease_info)
334         self.add_latency("add-lease", time.time() - start)
335         return None
336
337     def remote_renew_lease(self, storage_index, renew_secret):
338         start = time.time()
339         self.count("renew")
340         new_expire_time = time.time() + 31*24*60*60
341         found_buckets = False
342         for sf in self._iter_share_files(storage_index):
343             found_buckets = True
344             sf.renew_lease(renew_secret, new_expire_time)
345         self.add_latency("renew", time.time() - start)
346         if not found_buckets:
347             raise IndexError("no such lease to renew")
348
349     def bucket_writer_closed(self, bw, consumed_size):
350         if self.stats_provider:
351             self.stats_provider.count('storage_server.bytes_added', consumed_size)
352         del self._active_writers[bw]
353
354     def _get_bucket_shares(self, storage_index):
355         """Return a list of (shnum, pathname) tuples for files that hold
356         shares for this storage_index. In each tuple, 'shnum' will always be
357         the integer form of the last component of 'pathname'."""
358         storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
359         try:
360             for f in os.listdir(storagedir):
361                 if NUM_RE.match(f):
362                     filename = os.path.join(storagedir, f)
363                     yield (int(f), filename)
364         except OSError:
365             # Commonly caused by there being no buckets at all.
366             pass
367
368     def remote_get_buckets(self, storage_index):
369         start = time.time()
370         self.count("get")
371         si_s = si_b2a(storage_index)
372         log.msg("storage: get_buckets %s" % si_s)
373         bucketreaders = {} # k: sharenum, v: BucketReader
374         for shnum, filename in self._get_bucket_shares(storage_index):
375             bucketreaders[shnum] = BucketReader(self, filename,
376                                                 storage_index, shnum)
377         self.add_latency("get", time.time() - start)
378         return bucketreaders
379
380     def get_leases(self, storage_index):
381         """Provide an iterator that yields all of the leases attached to this
382         bucket. Each lease is returned as a LeaseInfo instance.
383
384         This method is not for client use.
385         """
386
387         # since all shares get the same lease data, we just grab the leases
388         # from the first share
389         try:
390             shnum, filename = self._get_bucket_shares(storage_index).next()
391             sf = ShareFile(filename)
392             return sf.get_leases()
393         except StopIteration:
394             return iter([])
395
396     def remote_slot_testv_and_readv_and_writev(self, storage_index,
397                                                secrets,
398                                                test_and_write_vectors,
399                                                read_vector):
400         start = time.time()
401         self.count("writev")
402         si_s = si_b2a(storage_index)
403         log.msg("storage: slot_writev %s" % si_s)
404         si_dir = storage_index_to_dir(storage_index)
405         (write_enabler, renew_secret, cancel_secret) = secrets
406         # shares exist if there is a file for them
407         bucketdir = os.path.join(self.sharedir, si_dir)
408         shares = {}
409         if os.path.isdir(bucketdir):
410             for sharenum_s in os.listdir(bucketdir):
411                 try:
412                     sharenum = int(sharenum_s)
413                 except ValueError:
414                     continue
415                 filename = os.path.join(bucketdir, sharenum_s)
416                 msf = MutableShareFile(filename, self)
417                 msf.check_write_enabler(write_enabler, si_s)
418                 shares[sharenum] = msf
419         # write_enabler is good for all existing shares.
420
421         # Now evaluate test vectors.
422         testv_is_good = True
423         for sharenum in test_and_write_vectors:
424             (testv, datav, new_length) = test_and_write_vectors[sharenum]
425             if sharenum in shares:
426                 if not shares[sharenum].check_testv(testv):
427                     self.log("testv failed: [%d]: %r" % (sharenum, testv))
428                     testv_is_good = False
429                     break
430             else:
431                 # compare the vectors against an empty share, in which all
432                 # reads return empty strings.
433                 if not EmptyShare().check_testv(testv):
434                     self.log("testv failed (empty): [%d] %r" % (sharenum,
435                                                                 testv))
436                     testv_is_good = False
437                     break
438
439         # now gather the read vectors, before we do any writes
440         read_data = {}
441         for sharenum, share in shares.items():
442             read_data[sharenum] = share.readv(read_vector)
443
444         ownerid = 1 # TODO
445         expire_time = time.time() + 31*24*60*60   # one month
446         lease_info = LeaseInfo(ownerid,
447                                renew_secret, cancel_secret,
448                                expire_time, self.my_nodeid)
449
450         if testv_is_good:
451             # now apply the write vectors
452             for sharenum in test_and_write_vectors:
453                 (testv, datav, new_length) = test_and_write_vectors[sharenum]
454                 if new_length == 0:
455                     if sharenum in shares:
456                         shares[sharenum].unlink()
457                 else:
458                     if sharenum not in shares:
459                         # allocate a new share
460                         allocated_size = 2000 # arbitrary, really
461                         share = self._allocate_slot_share(bucketdir, secrets,
462                                                           sharenum,
463                                                           allocated_size,
464                                                           owner_num=0)
465                         shares[sharenum] = share
466                     shares[sharenum].writev(datav, new_length)
467                     # and update the lease
468                     shares[sharenum].add_or_renew_lease(lease_info)
469
470             if new_length == 0:
471                 # delete empty bucket directories
472                 if not os.listdir(bucketdir):
473                     os.rmdir(bucketdir)
474
475
476         # all done
477         self.add_latency("writev", time.time() - start)
478         return (testv_is_good, read_data)
479
480     def _allocate_slot_share(self, bucketdir, secrets, sharenum,
481                              allocated_size, owner_num=0):
482         (write_enabler, renew_secret, cancel_secret) = secrets
483         my_nodeid = self.my_nodeid
484         fileutil.make_dirs(bucketdir)
485         filename = os.path.join(bucketdir, "%d" % sharenum)
486         share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
487                                          self)
488         return share
489
490     def remote_slot_readv(self, storage_index, shares, readv):
491         start = time.time()
492         self.count("readv")
493         si_s = si_b2a(storage_index)
494         lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
495                      facility="tahoe.storage", level=log.OPERATIONAL)
496         si_dir = storage_index_to_dir(storage_index)
497         # shares exist if there is a file for them
498         bucketdir = os.path.join(self.sharedir, si_dir)
499         if not os.path.isdir(bucketdir):
500             self.add_latency("readv", time.time() - start)
501             return {}
502         datavs = {}
503         for sharenum_s in os.listdir(bucketdir):
504             try:
505                 sharenum = int(sharenum_s)
506             except ValueError:
507                 continue
508             if sharenum in shares or not shares:
509                 filename = os.path.join(bucketdir, sharenum_s)
510                 msf = MutableShareFile(filename, self)
511                 datavs[sharenum] = msf.readv(readv)
512         log.msg("returning shares %s" % (datavs.keys(),),
513                 facility="tahoe.storage", level=log.NOISY, parent=lp)
514         self.add_latency("readv", time.time() - start)
515         return datavs
516
517     def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
518                                     reason):
519         fileutil.make_dirs(self.corruption_advisory_dir)
520         now = time_format.iso_utc(sep="T")
521         si_s = si_b2a(storage_index)
522         # windows can't handle colons in the filename
523         fn = os.path.join(self.corruption_advisory_dir,
524                           "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
525         f = open(fn, "w")
526         f.write("report: Share Corruption\n")
527         f.write("type: %s\n" % share_type)
528         f.write("storage_index: %s\n" % si_s)
529         f.write("share_number: %d\n" % shnum)
530         f.write("\n")
531         f.write(reason)
532         f.write("\n")
533         f.close()
534         log.msg(format=("client claims corruption in (%(share_type)s) " +
535                         "%(si)s-%(shnum)d: %(reason)s"),
536                 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
537                 level=log.SCARY, umid="SGx2fA")
538         return None