]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/server.py
c5e5b3925b4f4a78b0a6e53083b6cec6f7380eb9
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / server.py
1 import os, re, weakref, struct, time
2
3 from foolscap.api import Referenceable
4 from twisted.application import service
5
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
10
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15      create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18 from allmydata.storage.expirer import LeaseCheckingCrawler
19
20 # storage/
21 # storage/shares/incoming
22 #   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23 #   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24 # storage/shares/$START/$STORAGEINDEX
25 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
26
27 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
28 # base-32 chars).
29
30 # $SHARENUM matches this regex:
31 NUM_RE=re.compile("^[0-9]+$")
32
33
34
35 class StorageServer(service.MultiService, Referenceable):
36     implements(RIStorageServer, IStatsProducer)
37     name = 'storage'
38     LeaseCheckerClass = LeaseCheckingCrawler
39
40     def __init__(self, storedir, nodeid, reserved_space=0,
41                  discard_storage=False, readonly_storage=False,
42                  stats_provider=None,
43                  expiration_enabled=False,
44                  expiration_mode="age",
45                  expiration_override_lease_duration=None,
46                  expiration_cutoff_date=None,
47                  expiration_sharetypes=("mutable", "immutable")):
48         service.MultiService.__init__(self)
49         assert isinstance(nodeid, str)
50         assert len(nodeid) == 20
51         self.my_nodeid = nodeid
52         self.storedir = storedir
53         sharedir = os.path.join(storedir, "shares")
54         fileutil.make_dirs(sharedir)
55         self.sharedir = sharedir
56         # we don't actually create the corruption-advisory dir until necessary
57         self.corruption_advisory_dir = os.path.join(storedir,
58                                                     "corruption-advisories")
59         self.reserved_space = int(reserved_space)
60         self.no_storage = discard_storage
61         self.readonly_storage = readonly_storage
62         self.stats_provider = stats_provider
63         if self.stats_provider:
64             self.stats_provider.register_producer(self)
65         self.incomingdir = os.path.join(sharedir, 'incoming')
66         self._clean_incomplete()
67         fileutil.make_dirs(self.incomingdir)
68         self._active_writers = weakref.WeakKeyDictionary()
69         log.msg("StorageServer created", facility="tahoe.storage")
70
71         if reserved_space:
72             if self.get_available_space() is None:
73                 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
74                         umin="0wZ27w", level=log.UNUSUAL)
75
76         self.latencies = {"allocate": [], # immutable
77                           "write": [],
78                           "close": [],
79                           "read": [],
80                           "get": [],
81                           "writev": [], # mutable
82                           "readv": [],
83                           "add-lease": [], # both
84                           "renew": [],
85                           "cancel": [],
86                           }
87         self.add_bucket_counter()
88
89         statefile = os.path.join(self.storedir, "lease_checker.state")
90         historyfile = os.path.join(self.storedir, "lease_checker.history")
91         klass = self.LeaseCheckerClass
92         self.lease_checker = klass(self, statefile, historyfile,
93                                    expiration_enabled, expiration_mode,
94                                    expiration_override_lease_duration,
95                                    expiration_cutoff_date,
96                                    expiration_sharetypes)
97         self.lease_checker.setServiceParent(self)
98
99     def __repr__(self):
100         return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
101
102     def have_shares(self):
103         # quick test to decide if we need to commit to an implicit
104         # permutation-seed or if we should use a new one
105         return bool(set(os.listdir(self.sharedir)) - set(["incoming"]))
106
107     def add_bucket_counter(self):
108         statefile = os.path.join(self.storedir, "bucket_counter.state")
109         self.bucket_counter = BucketCountingCrawler(self, statefile)
110         self.bucket_counter.setServiceParent(self)
111
112     def count(self, name, delta=1):
113         if self.stats_provider:
114             self.stats_provider.count("storage_server." + name, delta)
115
116     def add_latency(self, category, latency):
117         a = self.latencies[category]
118         a.append(latency)
119         if len(a) > 1000:
120             self.latencies[category] = a[-1000:]
121
122     def get_latencies(self):
123         """Return a dict, indexed by category, that contains a dict of
124         latency numbers for each category. If there are sufficient samples
125         for unambiguous interpretation, each dict will contain the
126         following keys: mean, 01_0_percentile, 10_0_percentile,
127         50_0_percentile (median), 90_0_percentile, 95_0_percentile,
128         99_0_percentile, 99_9_percentile.  If there are insufficient
129         samples for a given percentile to be interpreted unambiguously
130         that percentile will be reported as None. If no samples have been
131         collected for the given category, then that category name will
132         not be present in the return value. """
133         # note that Amazon's Dynamo paper says they use 99.9% percentile.
134         output = {}
135         for category in self.latencies:
136             if not self.latencies[category]:
137                 continue
138             stats = {}
139             samples = self.latencies[category][:]
140             count = len(samples)
141             stats["samplesize"] = count
142             samples.sort()
143             if count > 1:
144                 stats["mean"] = sum(samples) / count
145             else:
146                 stats["mean"] = None
147
148             orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
149                              (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
150                              (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
151                              (0.999, "99_9_percentile", 1000)]
152
153             for percentile, percentilestring, minnumtoobserve in orderstatlist:
154                 if count >= minnumtoobserve:
155                     stats[percentilestring] = samples[int(percentile*count)]
156                 else:
157                     stats[percentilestring] = None
158
159             output[category] = stats
160         return output
161
162     def log(self, *args, **kwargs):
163         if "facility" not in kwargs:
164             kwargs["facility"] = "tahoe.storage"
165         return log.msg(*args, **kwargs)
166
167     def _clean_incomplete(self):
168         fileutil.rm_dir(self.incomingdir)
169
170     def get_stats(self):
171         # remember: RIStatsProvider requires that our return dict
172         # contains numeric values.
173         stats = { 'storage_server.allocated': self.allocated_size(), }
174         stats['storage_server.reserved_space'] = self.reserved_space
175         for category,ld in self.get_latencies().items():
176             for name,v in ld.items():
177                 stats['storage_server.latencies.%s.%s' % (category, name)] = v
178
179         try:
180             disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
181             writeable = disk['avail'] > 0
182
183             # spacetime predictors should use disk_avail / (d(disk_used)/dt)
184             stats['storage_server.disk_total'] = disk['total']
185             stats['storage_server.disk_used'] = disk['used']
186             stats['storage_server.disk_free_for_root'] = disk['free_for_root']
187             stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
188             stats['storage_server.disk_avail'] = disk['avail']
189         except AttributeError:
190             writeable = True
191         except EnvironmentError:
192             log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
193             writeable = False
194
195         if self.readonly_storage:
196             stats['storage_server.disk_avail'] = 0
197             writeable = False
198
199         stats['storage_server.accepting_immutable_shares'] = int(writeable)
200         s = self.bucket_counter.get_state()
201         bucket_count = s.get("last-complete-bucket-count")
202         if bucket_count:
203             stats['storage_server.total_bucket_count'] = bucket_count
204         return stats
205
206     def get_available_space(self):
207         """Returns available space for share storage in bytes, or None if no
208         API to get this information is available."""
209
210         if self.readonly_storage:
211             return 0
212         return fileutil.get_available_space(self.sharedir, self.reserved_space)
213
214     def allocated_size(self):
215         space = 0
216         for bw in self._active_writers:
217             space += bw.allocated_size()
218         return space
219
220     def remote_get_version(self):
221         remaining_space = self.get_available_space()
222         if remaining_space is None:
223             # We're on a platform that has no API to get disk stats.
224             remaining_space = 2**64
225
226         version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
227                     { "maximum-immutable-share-size": remaining_space,
228                       "tolerates-immutable-read-overrun": True,
229                       "delete-mutable-shares-with-zero-length-writev": True,
230                       "fills-holes-with-zero-bytes": True,
231                       "prevents-read-past-end-of-share-data": True,
232                       },
233                     "application-version": str(allmydata.__full_version__),
234                     }
235         return version
236
237     def remote_allocate_buckets(self, storage_index,
238                                 renew_secret, cancel_secret,
239                                 sharenums, allocated_size,
240                                 canary, owner_num=0):
241         # owner_num is not for clients to set, but rather it should be
242         # curried into the PersonalStorageServer instance that is dedicated
243         # to a particular owner.
244         start = time.time()
245         self.count("allocate")
246         alreadygot = set()
247         bucketwriters = {} # k: shnum, v: BucketWriter
248         si_dir = storage_index_to_dir(storage_index)
249         si_s = si_b2a(storage_index)
250
251         log.msg("storage: allocate_buckets %s" % si_s)
252
253         # in this implementation, the lease information (including secrets)
254         # goes into the share files themselves. It could also be put into a
255         # separate database. Note that the lease should not be added until
256         # the BucketWriter has been closed.
257         expire_time = time.time() + 31*24*60*60
258         lease_info = LeaseInfo(owner_num,
259                                renew_secret, cancel_secret,
260                                expire_time, self.my_nodeid)
261
262         max_space_per_bucket = allocated_size
263
264         remaining_space = self.get_available_space()
265         limited = remaining_space is not None
266         if limited:
267             # this is a bit conservative, since some of this allocated_size()
268             # has already been written to disk, where it will show up in
269             # get_available_space.
270             remaining_space -= self.allocated_size()
271         # self.readonly_storage causes remaining_space <= 0
272
273         # fill alreadygot with all shares that we have, not just the ones
274         # they asked about: this will save them a lot of work. Add or update
275         # leases for all of them: if they want us to hold shares for this
276         # file, they'll want us to hold leases for this file.
277         for (shnum, fn) in self._get_bucket_shares(storage_index):
278             alreadygot.add(shnum)
279             sf = ShareFile(fn)
280             sf.add_or_renew_lease(lease_info)
281
282         for shnum in sharenums:
283             incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
284             finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
285             if os.path.exists(finalhome):
286                 # great! we already have it. easy.
287                 pass
288             elif os.path.exists(incominghome):
289                 # Note that we don't create BucketWriters for shnums that
290                 # have a partial share (in incoming/), so if a second upload
291                 # occurs while the first is still in progress, the second
292                 # uploader will use different storage servers.
293                 pass
294             elif (not limited) or (remaining_space >= max_space_per_bucket):
295                 # ok! we need to create the new share file.
296                 bw = BucketWriter(self, incominghome, finalhome,
297                                   max_space_per_bucket, lease_info, canary)
298                 if self.no_storage:
299                     bw.throw_out_all_data = True
300                 bucketwriters[shnum] = bw
301                 self._active_writers[bw] = 1
302                 if limited:
303                     remaining_space -= max_space_per_bucket
304             else:
305                 # bummer! not enough space to accept this bucket
306                 pass
307
308         if bucketwriters:
309             fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
310
311         self.add_latency("allocate", time.time() - start)
312         return alreadygot, bucketwriters
313
314     def _iter_share_files(self, storage_index):
315         for shnum, filename in self._get_bucket_shares(storage_index):
316             f = open(filename, 'rb')
317             header = f.read(32)
318             f.close()
319             if header[:32] == MutableShareFile.MAGIC:
320                 sf = MutableShareFile(filename, self)
321                 # note: if the share has been migrated, the renew_lease()
322                 # call will throw an exception, with information to help the
323                 # client update the lease.
324             elif header[:4] == struct.pack(">L", 1):
325                 sf = ShareFile(filename)
326             else:
327                 continue # non-sharefile
328             yield sf
329
330     def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
331                          owner_num=1):
332         start = time.time()
333         self.count("add-lease")
334         new_expire_time = time.time() + 31*24*60*60
335         lease_info = LeaseInfo(owner_num,
336                                renew_secret, cancel_secret,
337                                new_expire_time, self.my_nodeid)
338         for sf in self._iter_share_files(storage_index):
339             sf.add_or_renew_lease(lease_info)
340         self.add_latency("add-lease", time.time() - start)
341         return None
342
343     def remote_renew_lease(self, storage_index, renew_secret):
344         start = time.time()
345         self.count("renew")
346         new_expire_time = time.time() + 31*24*60*60
347         found_buckets = False
348         for sf in self._iter_share_files(storage_index):
349             found_buckets = True
350             sf.renew_lease(renew_secret, new_expire_time)
351         self.add_latency("renew", time.time() - start)
352         if not found_buckets:
353             raise IndexError("no such lease to renew")
354
355     def bucket_writer_closed(self, bw, consumed_size):
356         if self.stats_provider:
357             self.stats_provider.count('storage_server.bytes_added', consumed_size)
358         del self._active_writers[bw]
359
360     def _get_bucket_shares(self, storage_index):
361         """Return a list of (shnum, pathname) tuples for files that hold
362         shares for this storage_index. In each tuple, 'shnum' will always be
363         the integer form of the last component of 'pathname'."""
364         storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
365         try:
366             for f in os.listdir(storagedir):
367                 if NUM_RE.match(f):
368                     filename = os.path.join(storagedir, f)
369                     yield (int(f), filename)
370         except OSError:
371             # Commonly caused by there being no buckets at all.
372             pass
373
374     def remote_get_buckets(self, storage_index):
375         start = time.time()
376         self.count("get")
377         si_s = si_b2a(storage_index)
378         log.msg("storage: get_buckets %s" % si_s)
379         bucketreaders = {} # k: sharenum, v: BucketReader
380         for shnum, filename in self._get_bucket_shares(storage_index):
381             bucketreaders[shnum] = BucketReader(self, filename,
382                                                 storage_index, shnum)
383         self.add_latency("get", time.time() - start)
384         return bucketreaders
385
386     def get_leases(self, storage_index):
387         """Provide an iterator that yields all of the leases attached to this
388         bucket. Each lease is returned as a LeaseInfo instance.
389
390         This method is not for client use.
391         """
392
393         # since all shares get the same lease data, we just grab the leases
394         # from the first share
395         try:
396             shnum, filename = self._get_bucket_shares(storage_index).next()
397             sf = ShareFile(filename)
398             return sf.get_leases()
399         except StopIteration:
400             return iter([])
401
402     def remote_slot_testv_and_readv_and_writev(self, storage_index,
403                                                secrets,
404                                                test_and_write_vectors,
405                                                read_vector):
406         start = time.time()
407         self.count("writev")
408         si_s = si_b2a(storage_index)
409         log.msg("storage: slot_writev %s" % si_s)
410         si_dir = storage_index_to_dir(storage_index)
411         (write_enabler, renew_secret, cancel_secret) = secrets
412         # shares exist if there is a file for them
413         bucketdir = os.path.join(self.sharedir, si_dir)
414         shares = {}
415         if os.path.isdir(bucketdir):
416             for sharenum_s in os.listdir(bucketdir):
417                 try:
418                     sharenum = int(sharenum_s)
419                 except ValueError:
420                     continue
421                 filename = os.path.join(bucketdir, sharenum_s)
422                 msf = MutableShareFile(filename, self)
423                 msf.check_write_enabler(write_enabler, si_s)
424                 shares[sharenum] = msf
425         # write_enabler is good for all existing shares.
426
427         # Now evaluate test vectors.
428         testv_is_good = True
429         for sharenum in test_and_write_vectors:
430             (testv, datav, new_length) = test_and_write_vectors[sharenum]
431             if sharenum in shares:
432                 if not shares[sharenum].check_testv(testv):
433                     self.log("testv failed: [%d]: %r" % (sharenum, testv))
434                     testv_is_good = False
435                     break
436             else:
437                 # compare the vectors against an empty share, in which all
438                 # reads return empty strings.
439                 if not EmptyShare().check_testv(testv):
440                     self.log("testv failed (empty): [%d] %r" % (sharenum,
441                                                                 testv))
442                     testv_is_good = False
443                     break
444
445         # now gather the read vectors, before we do any writes
446         read_data = {}
447         for sharenum, share in shares.items():
448             read_data[sharenum] = share.readv(read_vector)
449
450         ownerid = 1 # TODO
451         expire_time = time.time() + 31*24*60*60   # one month
452         lease_info = LeaseInfo(ownerid,
453                                renew_secret, cancel_secret,
454                                expire_time, self.my_nodeid)
455
456         if testv_is_good:
457             # now apply the write vectors
458             for sharenum in test_and_write_vectors:
459                 (testv, datav, new_length) = test_and_write_vectors[sharenum]
460                 if new_length == 0:
461                     if sharenum in shares:
462                         shares[sharenum].unlink()
463                 else:
464                     if sharenum not in shares:
465                         # allocate a new share
466                         allocated_size = 2000 # arbitrary, really
467                         share = self._allocate_slot_share(bucketdir, secrets,
468                                                           sharenum,
469                                                           allocated_size,
470                                                           owner_num=0)
471                         shares[sharenum] = share
472                     shares[sharenum].writev(datav, new_length)
473                     # and update the lease
474                     shares[sharenum].add_or_renew_lease(lease_info)
475
476             if new_length == 0:
477                 # delete empty bucket directories
478                 if not os.listdir(bucketdir):
479                     os.rmdir(bucketdir)
480
481
482         # all done
483         self.add_latency("writev", time.time() - start)
484         return (testv_is_good, read_data)
485
486     def _allocate_slot_share(self, bucketdir, secrets, sharenum,
487                              allocated_size, owner_num=0):
488         (write_enabler, renew_secret, cancel_secret) = secrets
489         my_nodeid = self.my_nodeid
490         fileutil.make_dirs(bucketdir)
491         filename = os.path.join(bucketdir, "%d" % sharenum)
492         share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
493                                          self)
494         return share
495
496     def remote_slot_readv(self, storage_index, shares, readv):
497         start = time.time()
498         self.count("readv")
499         si_s = si_b2a(storage_index)
500         lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
501                      facility="tahoe.storage", level=log.OPERATIONAL)
502         si_dir = storage_index_to_dir(storage_index)
503         # shares exist if there is a file for them
504         bucketdir = os.path.join(self.sharedir, si_dir)
505         if not os.path.isdir(bucketdir):
506             self.add_latency("readv", time.time() - start)
507             return {}
508         datavs = {}
509         for sharenum_s in os.listdir(bucketdir):
510             try:
511                 sharenum = int(sharenum_s)
512             except ValueError:
513                 continue
514             if sharenum in shares or not shares:
515                 filename = os.path.join(bucketdir, sharenum_s)
516                 msf = MutableShareFile(filename, self)
517                 datavs[sharenum] = msf.readv(readv)
518         log.msg("returning shares %s" % (datavs.keys(),),
519                 facility="tahoe.storage", level=log.NOISY, parent=lp)
520         self.add_latency("readv", time.time() - start)
521         return datavs
522
523     def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
524                                     reason):
525         fileutil.make_dirs(self.corruption_advisory_dir)
526         now = time_format.iso_utc(sep="T")
527         si_s = si_b2a(storage_index)
528         # windows can't handle colons in the filename
529         fn = os.path.join(self.corruption_advisory_dir,
530                           "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
531         f = open(fn, "w")
532         f.write("report: Share Corruption\n")
533         f.write("type: %s\n" % share_type)
534         f.write("storage_index: %s\n" % si_s)
535         f.write("share_number: %d\n" % shnum)
536         f.write("\n")
537         f.write(reason)
538         f.write("\n")
539         f.close()
540         log.msg(format=("client claims corruption in (%(share_type)s) " +
541                         "%(si)s-%(shnum)d: %(reason)s"),
542                 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
543                 level=log.SCARY, umid="SGx2fA")
544         return None