]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/server.py
48e295713c28c2d10e0cfae3fb90807d2ad33828
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / server.py
1 import os, re, weakref, struct, time
2
3 from foolscap.api import Referenceable
4 from twisted.application import service
5
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
10
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15      create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18 from allmydata.storage.expirer import LeaseCheckingCrawler
19
20 # storage/
21 # storage/shares/incoming
22 #   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23 #   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24 # storage/shares/$START/$STORAGEINDEX
25 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
26
27 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
28 # base-32 chars).
29
30 # $SHARENUM matches this regex:
31 NUM_RE=re.compile("^[0-9]+$")
32
33
34
35 class StorageServer(service.MultiService, Referenceable):
36     implements(RIStorageServer, IStatsProducer)
37     name = 'storage'
38     LeaseCheckerClass = LeaseCheckingCrawler
39
40     def __init__(self, storedir, nodeid, reserved_space=0,
41                  discard_storage=False, readonly_storage=False,
42                  stats_provider=None,
43                  expiration_enabled=False,
44                  expiration_mode="age",
45                  expiration_override_lease_duration=None,
46                  expiration_cutoff_date=None,
47                  expiration_sharetypes=("mutable", "immutable")):
48         service.MultiService.__init__(self)
49         assert isinstance(nodeid, str)
50         assert len(nodeid) == 20
51         self.my_nodeid = nodeid
52         self.storedir = storedir
53         sharedir = os.path.join(storedir, "shares")
54         fileutil.make_dirs(sharedir)
55         self.sharedir = sharedir
56         # we don't actually create the corruption-advisory dir until necessary
57         self.corruption_advisory_dir = os.path.join(storedir,
58                                                     "corruption-advisories")
59         self.reserved_space = int(reserved_space)
60         self.no_storage = discard_storage
61         self.readonly_storage = readonly_storage
62         self.stats_provider = stats_provider
63         if self.stats_provider:
64             self.stats_provider.register_producer(self)
65         self.incomingdir = os.path.join(sharedir, 'incoming')
66         self._clean_incomplete()
67         fileutil.make_dirs(self.incomingdir)
68         self._active_writers = weakref.WeakKeyDictionary()
69         log.msg("StorageServer created", facility="tahoe.storage")
70
71         if reserved_space:
72             if self.get_available_space() is None:
73                 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
74                         umin="0wZ27w", level=log.UNUSUAL)
75
76         self.latencies = {"allocate": [], # immutable
77                           "write": [],
78                           "close": [],
79                           "read": [],
80                           "get": [],
81                           "writev": [], # mutable
82                           "readv": [],
83                           "add-lease": [], # both
84                           "renew": [],
85                           "cancel": [],
86                           }
87         self.add_bucket_counter()
88
89         statefile = os.path.join(self.storedir, "lease_checker.state")
90         historyfile = os.path.join(self.storedir, "lease_checker.history")
91         klass = self.LeaseCheckerClass
92         self.lease_checker = klass(self, statefile, historyfile,
93                                    expiration_enabled, expiration_mode,
94                                    expiration_override_lease_duration,
95                                    expiration_cutoff_date,
96                                    expiration_sharetypes)
97         self.lease_checker.setServiceParent(self)
98
99     def __repr__(self):
100         return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
101
102     def add_bucket_counter(self):
103         statefile = os.path.join(self.storedir, "bucket_counter.state")
104         self.bucket_counter = BucketCountingCrawler(self, statefile)
105         self.bucket_counter.setServiceParent(self)
106
107     def count(self, name, delta=1):
108         if self.stats_provider:
109             self.stats_provider.count("storage_server." + name, delta)
110
111     def add_latency(self, category, latency):
112         a = self.latencies[category]
113         a.append(latency)
114         if len(a) > 1000:
115             self.latencies[category] = a[-1000:]
116
117     def get_latencies(self):
118         """Return a dict, indexed by category, that contains a dict of
119         latency numbers for each category. If there are sufficient samples
120         for unambiguous interpretation, each dict will contain the
121         following keys: mean, 01_0_percentile, 10_0_percentile,
122         50_0_percentile (median), 90_0_percentile, 95_0_percentile,
123         99_0_percentile, 99_9_percentile.  If there are insufficient
124         samples for a given percentile to be interpreted unambiguously
125         that percentile will be reported as None. If no samples have been
126         collected for the given category, then that category name will
127         not be present in the return value. """
128         # note that Amazon's Dynamo paper says they use 99.9% percentile.
129         output = {}
130         for category in self.latencies:
131             if not self.latencies[category]:
132                 continue
133             stats = {}
134             samples = self.latencies[category][:]
135             count = len(samples)
136             stats["samplesize"] = count
137             samples.sort()
138             if count > 1:
139                 stats["mean"] = sum(samples) / count
140             else:
141                 stats["mean"] = None
142
143             orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
144                              (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
145                              (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
146                              (0.999, "99_9_percentile", 1000)]
147
148             for percentile, percentilestring, minnumtoobserve in orderstatlist:
149                 if count >= minnumtoobserve:
150                     stats[percentilestring] = samples[int(percentile*count)]
151                 else:
152                     stats[percentilestring] = None
153
154             output[category] = stats
155         return output
156
157     def log(self, *args, **kwargs):
158         if "facility" not in kwargs:
159             kwargs["facility"] = "tahoe.storage"
160         return log.msg(*args, **kwargs)
161
162     def _clean_incomplete(self):
163         fileutil.rm_dir(self.incomingdir)
164
165     def get_stats(self):
166         # remember: RIStatsProvider requires that our return dict
167         # contains numeric values.
168         stats = { 'storage_server.allocated': self.allocated_size(), }
169         stats['storage_server.reserved_space'] = self.reserved_space
170         for category,ld in self.get_latencies().items():
171             for name,v in ld.items():
172                 stats['storage_server.latencies.%s.%s' % (category, name)] = v
173
174         try:
175             disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
176             writeable = disk['avail'] > 0
177
178             # spacetime predictors should use disk_avail / (d(disk_used)/dt)
179             stats['storage_server.disk_total'] = disk['total']
180             stats['storage_server.disk_used'] = disk['used']
181             stats['storage_server.disk_free_for_root'] = disk['free_for_root']
182             stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
183             stats['storage_server.disk_avail'] = disk['avail']
184         except AttributeError:
185             writeable = True
186         except EnvironmentError:
187             log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
188             writeable = False
189
190         if self.readonly_storage:
191             stats['storage_server.disk_avail'] = 0
192             writeable = False
193
194         stats['storage_server.accepting_immutable_shares'] = int(writeable)
195         s = self.bucket_counter.get_state()
196         bucket_count = s.get("last-complete-bucket-count")
197         if bucket_count:
198             stats['storage_server.total_bucket_count'] = bucket_count
199         return stats
200
201     def get_available_space(self):
202         """Returns available space for share storage in bytes, or None if no
203         API to get this information is available."""
204
205         if self.readonly_storage:
206             return 0
207         return fileutil.get_available_space(self.sharedir, self.reserved_space)
208
209     def allocated_size(self):
210         space = 0
211         for bw in self._active_writers:
212             space += bw.allocated_size()
213         return space
214
215     def remote_get_version(self):
216         remaining_space = self.get_available_space()
217         if remaining_space is None:
218             # We're on a platform that has no API to get disk stats.
219             remaining_space = 2**64
220
221         version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
222                     { "maximum-immutable-share-size": remaining_space,
223                       "tolerates-immutable-read-overrun": True,
224                       "delete-mutable-shares-with-zero-length-writev": True,
225                       },
226                     "application-version": str(allmydata.__full_version__),
227                     }
228         return version
229
230     def remote_allocate_buckets(self, storage_index,
231                                 renew_secret, cancel_secret,
232                                 sharenums, allocated_size,
233                                 canary, owner_num=0):
234         # owner_num is not for clients to set, but rather it should be
235         # curried into the PersonalStorageServer instance that is dedicated
236         # to a particular owner.
237         start = time.time()
238         self.count("allocate")
239         alreadygot = set()
240         bucketwriters = {} # k: shnum, v: BucketWriter
241         si_dir = storage_index_to_dir(storage_index)
242         si_s = si_b2a(storage_index)
243
244         log.msg("storage: allocate_buckets %s" % si_s)
245
246         # in this implementation, the lease information (including secrets)
247         # goes into the share files themselves. It could also be put into a
248         # separate database. Note that the lease should not be added until
249         # the BucketWriter has been closed.
250         expire_time = time.time() + 31*24*60*60
251         lease_info = LeaseInfo(owner_num,
252                                renew_secret, cancel_secret,
253                                expire_time, self.my_nodeid)
254
255         max_space_per_bucket = allocated_size
256
257         remaining_space = self.get_available_space()
258         limited = remaining_space is not None
259         if limited:
260             # this is a bit conservative, since some of this allocated_size()
261             # has already been written to disk, where it will show up in
262             # get_available_space.
263             remaining_space -= self.allocated_size()
264         # self.readonly_storage causes remaining_space <= 0
265
266         # fill alreadygot with all shares that we have, not just the ones
267         # they asked about: this will save them a lot of work. Add or update
268         # leases for all of them: if they want us to hold shares for this
269         # file, they'll want us to hold leases for this file.
270         for (shnum, fn) in self._get_bucket_shares(storage_index):
271             alreadygot.add(shnum)
272             sf = ShareFile(fn)
273             sf.add_or_renew_lease(lease_info)
274
275         for shnum in sharenums:
276             incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
277             finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
278             if os.path.exists(finalhome):
279                 # great! we already have it. easy.
280                 pass
281             elif os.path.exists(incominghome):
282                 # Note that we don't create BucketWriters for shnums that
283                 # have a partial share (in incoming/), so if a second upload
284                 # occurs while the first is still in progress, the second
285                 # uploader will use different storage servers.
286                 pass
287             elif (not limited) or (remaining_space >= max_space_per_bucket):
288                 # ok! we need to create the new share file.
289                 bw = BucketWriter(self, incominghome, finalhome,
290                                   max_space_per_bucket, lease_info, canary)
291                 if self.no_storage:
292                     bw.throw_out_all_data = True
293                 bucketwriters[shnum] = bw
294                 self._active_writers[bw] = 1
295                 if limited:
296                     remaining_space -= max_space_per_bucket
297             else:
298                 # bummer! not enough space to accept this bucket
299                 pass
300
301         if bucketwriters:
302             fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
303
304         self.add_latency("allocate", time.time() - start)
305         return alreadygot, bucketwriters
306
307     def _iter_share_files(self, storage_index):
308         for shnum, filename in self._get_bucket_shares(storage_index):
309             f = open(filename, 'rb')
310             header = f.read(32)
311             f.close()
312             if header[:32] == MutableShareFile.MAGIC:
313                 sf = MutableShareFile(filename, self)
314                 # note: if the share has been migrated, the renew_lease()
315                 # call will throw an exception, with information to help the
316                 # client update the lease.
317             elif header[:4] == struct.pack(">L", 1):
318                 sf = ShareFile(filename)
319             else:
320                 continue # non-sharefile
321             yield sf
322
323     def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
324                          owner_num=1):
325         start = time.time()
326         self.count("add-lease")
327         new_expire_time = time.time() + 31*24*60*60
328         lease_info = LeaseInfo(owner_num,
329                                renew_secret, cancel_secret,
330                                new_expire_time, self.my_nodeid)
331         for sf in self._iter_share_files(storage_index):
332             sf.add_or_renew_lease(lease_info)
333         self.add_latency("add-lease", time.time() - start)
334         return None
335
336     def remote_renew_lease(self, storage_index, renew_secret):
337         start = time.time()
338         self.count("renew")
339         new_expire_time = time.time() + 31*24*60*60
340         found_buckets = False
341         for sf in self._iter_share_files(storage_index):
342             found_buckets = True
343             sf.renew_lease(renew_secret, new_expire_time)
344         self.add_latency("renew", time.time() - start)
345         if not found_buckets:
346             raise IndexError("no such lease to renew")
347
348     def remote_cancel_lease(self, storage_index, cancel_secret):
349         start = time.time()
350         self.count("cancel")
351
352         total_space_freed = 0
353         found_buckets = False
354         for sf in self._iter_share_files(storage_index):
355             # note: if we can't find a lease on one share, we won't bother
356             # looking in the others. Unless something broke internally
357             # (perhaps we ran out of disk space while adding a lease), the
358             # leases on all shares will be identical.
359             found_buckets = True
360             # this raises IndexError if the lease wasn't present XXXX
361             total_space_freed += sf.cancel_lease(cancel_secret)
362
363         if found_buckets:
364             storagedir = os.path.join(self.sharedir,
365                                       storage_index_to_dir(storage_index))
366             if not os.listdir(storagedir):
367                 os.rmdir(storagedir)
368
369         if self.stats_provider:
370             self.stats_provider.count('storage_server.bytes_freed',
371                                       total_space_freed)
372         self.add_latency("cancel", time.time() - start)
373         if not found_buckets:
374             raise IndexError("no such storage index")
375
376     def bucket_writer_closed(self, bw, consumed_size):
377         if self.stats_provider:
378             self.stats_provider.count('storage_server.bytes_added', consumed_size)
379         del self._active_writers[bw]
380
381     def _get_bucket_shares(self, storage_index):
382         """Return a list of (shnum, pathname) tuples for files that hold
383         shares for this storage_index. In each tuple, 'shnum' will always be
384         the integer form of the last component of 'pathname'."""
385         storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
386         try:
387             for f in os.listdir(storagedir):
388                 if NUM_RE.match(f):
389                     filename = os.path.join(storagedir, f)
390                     yield (int(f), filename)
391         except OSError:
392             # Commonly caused by there being no buckets at all.
393             pass
394
395     def remote_get_buckets(self, storage_index):
396         start = time.time()
397         self.count("get")
398         si_s = si_b2a(storage_index)
399         log.msg("storage: get_buckets %s" % si_s)
400         bucketreaders = {} # k: sharenum, v: BucketReader
401         for shnum, filename in self._get_bucket_shares(storage_index):
402             bucketreaders[shnum] = BucketReader(self, filename,
403                                                 storage_index, shnum)
404         self.add_latency("get", time.time() - start)
405         return bucketreaders
406
407     def get_leases(self, storage_index):
408         """Provide an iterator that yields all of the leases attached to this
409         bucket. Each lease is returned as a LeaseInfo instance.
410
411         This method is not for client use.
412         """
413
414         # since all shares get the same lease data, we just grab the leases
415         # from the first share
416         try:
417             shnum, filename = self._get_bucket_shares(storage_index).next()
418             sf = ShareFile(filename)
419             return sf.get_leases()
420         except StopIteration:
421             return iter([])
422
423     def remote_slot_testv_and_readv_and_writev(self, storage_index,
424                                                secrets,
425                                                test_and_write_vectors,
426                                                read_vector):
427         start = time.time()
428         self.count("writev")
429         si_s = si_b2a(storage_index)
430         log.msg("storage: slot_writev %s" % si_s)
431         si_dir = storage_index_to_dir(storage_index)
432         (write_enabler, renew_secret, cancel_secret) = secrets
433         # shares exist if there is a file for them
434         bucketdir = os.path.join(self.sharedir, si_dir)
435         shares = {}
436         if os.path.isdir(bucketdir):
437             for sharenum_s in os.listdir(bucketdir):
438                 try:
439                     sharenum = int(sharenum_s)
440                 except ValueError:
441                     continue
442                 filename = os.path.join(bucketdir, sharenum_s)
443                 msf = MutableShareFile(filename, self)
444                 msf.check_write_enabler(write_enabler, si_s)
445                 shares[sharenum] = msf
446         # write_enabler is good for all existing shares.
447
448         # Now evaluate test vectors.
449         testv_is_good = True
450         for sharenum in test_and_write_vectors:
451             (testv, datav, new_length) = test_and_write_vectors[sharenum]
452             if sharenum in shares:
453                 if not shares[sharenum].check_testv(testv):
454                     self.log("testv failed: [%d]: %r" % (sharenum, testv))
455                     testv_is_good = False
456                     break
457             else:
458                 # compare the vectors against an empty share, in which all
459                 # reads return empty strings.
460                 if not EmptyShare().check_testv(testv):
461                     self.log("testv failed (empty): [%d] %r" % (sharenum,
462                                                                 testv))
463                     testv_is_good = False
464                     break
465
466         # now gather the read vectors, before we do any writes
467         read_data = {}
468         for sharenum, share in shares.items():
469             read_data[sharenum] = share.readv(read_vector)
470
471         ownerid = 1 # TODO
472         expire_time = time.time() + 31*24*60*60   # one month
473         lease_info = LeaseInfo(ownerid,
474                                renew_secret, cancel_secret,
475                                expire_time, self.my_nodeid)
476
477         if testv_is_good:
478             # now apply the write vectors
479             for sharenum in test_and_write_vectors:
480                 (testv, datav, new_length) = test_and_write_vectors[sharenum]
481                 if new_length == 0:
482                     if sharenum in shares:
483                         shares[sharenum].unlink()
484                 else:
485                     if sharenum not in shares:
486                         # allocate a new share
487                         allocated_size = 2000 # arbitrary, really
488                         share = self._allocate_slot_share(bucketdir, secrets,
489                                                           sharenum,
490                                                           allocated_size,
491                                                           owner_num=0)
492                         shares[sharenum] = share
493                     shares[sharenum].writev(datav, new_length)
494                     # and update the lease
495                     shares[sharenum].add_or_renew_lease(lease_info)
496
497             if new_length == 0:
498                 # delete empty bucket directories
499                 if not os.listdir(bucketdir):
500                     os.rmdir(bucketdir)
501
502
503         # all done
504         self.add_latency("writev", time.time() - start)
505         return (testv_is_good, read_data)
506
507     def _allocate_slot_share(self, bucketdir, secrets, sharenum,
508                              allocated_size, owner_num=0):
509         (write_enabler, renew_secret, cancel_secret) = secrets
510         my_nodeid = self.my_nodeid
511         fileutil.make_dirs(bucketdir)
512         filename = os.path.join(bucketdir, "%d" % sharenum)
513         share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
514                                          self)
515         return share
516
517     def remote_slot_readv(self, storage_index, shares, readv):
518         start = time.time()
519         self.count("readv")
520         si_s = si_b2a(storage_index)
521         lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
522                      facility="tahoe.storage", level=log.OPERATIONAL)
523         si_dir = storage_index_to_dir(storage_index)
524         # shares exist if there is a file for them
525         bucketdir = os.path.join(self.sharedir, si_dir)
526         if not os.path.isdir(bucketdir):
527             self.add_latency("readv", time.time() - start)
528             return {}
529         datavs = {}
530         for sharenum_s in os.listdir(bucketdir):
531             try:
532                 sharenum = int(sharenum_s)
533             except ValueError:
534                 continue
535             if sharenum in shares or not shares:
536                 filename = os.path.join(bucketdir, sharenum_s)
537                 msf = MutableShareFile(filename, self)
538                 datavs[sharenum] = msf.readv(readv)
539         log.msg("returning shares %s" % (datavs.keys(),),
540                 facility="tahoe.storage", level=log.NOISY, parent=lp)
541         self.add_latency("readv", time.time() - start)
542         return datavs
543
544     def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
545                                     reason):
546         fileutil.make_dirs(self.corruption_advisory_dir)
547         now = time_format.iso_utc(sep="T")
548         si_s = si_b2a(storage_index)
549         # windows can't handle colons in the filename
550         fn = os.path.join(self.corruption_advisory_dir,
551                           "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
552         f = open(fn, "w")
553         f.write("report: Share Corruption\n")
554         f.write("type: %s\n" % share_type)
555         f.write("storage_index: %s\n" % si_s)
556         f.write("share_number: %d\n" % shnum)
557         f.write("\n")
558         f.write(reason)
559         f.write("\n")
560         f.close()
561         log.msg(format=("client claims corruption in (%(share_type)s) " +
562                         "%(si)s-%(shnum)d: %(reason)s"),
563                 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
564                 level=log.SCARY, umid="SGx2fA")
565         return None