]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/server.py
trivial: use more specific function for ascii-encoding storage index
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / server.py
1 import os, re, weakref, struct, time
2
3 from foolscap import Referenceable
4 from twisted.application import service
5
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import base32, fileutil, log, time_format
9 import allmydata # for __full_version__
10
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15      create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18
19 # storage/
20 # storage/shares/incoming
21 #   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
22 #   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
23 # storage/shares/$START/$STORAGEINDEX
24 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
25
26 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
27 # base-32 chars).
28
29 # $SHARENUM matches this regex:
30 NUM_RE=re.compile("^[0-9]+$")
31
32
33
34 class StorageServer(service.MultiService, Referenceable):
35     implements(RIStorageServer, IStatsProducer)
36     name = 'storage'
37
38     def __init__(self, storedir, nodeid, reserved_space=0,
39                  discard_storage=False, readonly_storage=False,
40                  stats_provider=None):
41         service.MultiService.__init__(self)
42         assert isinstance(nodeid, str)
43         assert len(nodeid) == 20
44         self.my_nodeid = nodeid
45         self.storedir = storedir
46         sharedir = os.path.join(storedir, "shares")
47         fileutil.make_dirs(sharedir)
48         self.sharedir = sharedir
49         # we don't actually create the corruption-advisory dir until necessary
50         self.corruption_advisory_dir = os.path.join(storedir,
51                                                     "corruption-advisories")
52         self.reserved_space = int(reserved_space)
53         self.no_storage = discard_storage
54         self.readonly_storage = readonly_storage
55         self.stats_provider = stats_provider
56         if self.stats_provider:
57             self.stats_provider.register_producer(self)
58         self.incomingdir = os.path.join(sharedir, 'incoming')
59         self._clean_incomplete()
60         fileutil.make_dirs(self.incomingdir)
61         self._active_writers = weakref.WeakKeyDictionary()
62         lp = log.msg("StorageServer created", facility="tahoe.storage")
63
64         if reserved_space:
65             if self.get_available_space() is None:
66                 log.msg("warning: [storage]reserved_space= is set, but this platform does not support statvfs(2), so this reservation cannot be honored",
67                         umin="0wZ27w", level=log.UNUSUAL)
68
69         self.latencies = {"allocate": [], # immutable
70                           "write": [],
71                           "close": [],
72                           "read": [],
73                           "get": [],
74                           "writev": [], # mutable
75                           "readv": [],
76                           "add-lease": [], # both
77                           "renew": [],
78                           "cancel": [],
79                           }
80         self.add_bucket_counter()
81
82     def add_bucket_counter(self):
83         statefile = os.path.join(self.storedir, "bucket_counter.state")
84         self.bucket_counter = BucketCountingCrawler(self, statefile)
85         self.bucket_counter.setServiceParent(self)
86
87     def count(self, name, delta=1):
88         if self.stats_provider:
89             self.stats_provider.count("storage_server." + name, delta)
90
91     def add_latency(self, category, latency):
92         a = self.latencies[category]
93         a.append(latency)
94         if len(a) > 1000:
95             self.latencies[category] = a[-1000:]
96
97     def get_latencies(self):
98         """Return a dict, indexed by category, that contains a dict of
99         latency numbers for each category. Each dict will contain the
100         following keys: mean, 01_0_percentile, 10_0_percentile,
101         50_0_percentile (median), 90_0_percentile, 95_0_percentile,
102         99_0_percentile, 99_9_percentile. If no samples have been collected
103         for the given category, then that category name will not be present
104         in the return value."""
105         # note that Amazon's Dynamo paper says they use 99.9% percentile.
106         output = {}
107         for category in self.latencies:
108             if not self.latencies[category]:
109                 continue
110             stats = {}
111             samples = self.latencies[category][:]
112             samples.sort()
113             count = len(samples)
114             stats["mean"] = sum(samples) / count
115             stats["01_0_percentile"] = samples[int(0.01 * count)]
116             stats["10_0_percentile"] = samples[int(0.1 * count)]
117             stats["50_0_percentile"] = samples[int(0.5 * count)]
118             stats["90_0_percentile"] = samples[int(0.9 * count)]
119             stats["95_0_percentile"] = samples[int(0.95 * count)]
120             stats["99_0_percentile"] = samples[int(0.99 * count)]
121             stats["99_9_percentile"] = samples[int(0.999 * count)]
122             output[category] = stats
123         return output
124
125     def log(self, *args, **kwargs):
126         if "facility" not in kwargs:
127             kwargs["facility"] = "tahoe.storage"
128         return log.msg(*args, **kwargs)
129
130     def _clean_incomplete(self):
131         fileutil.rm_dir(self.incomingdir)
132
133     def do_statvfs(self):
134         return os.statvfs(self.storedir)
135
136     def get_stats(self):
137         # remember: RIStatsProvider requires that our return dict
138         # contains numeric values.
139         stats = { 'storage_server.allocated': self.allocated_size(), }
140         stats["storage_server.reserved_space"] = self.reserved_space
141         for category,ld in self.get_latencies().items():
142             for name,v in ld.items():
143                 stats['storage_server.latencies.%s.%s' % (category, name)] = v
144         writeable = True
145         if self.readonly_storage:
146             writeable = False
147         try:
148             s = self.do_statvfs()
149             disk_total = s.f_bsize * s.f_blocks
150             disk_used = s.f_bsize * (s.f_blocks - s.f_bfree)
151             # spacetime predictors should look at the slope of disk_used.
152             disk_free_for_root = s.f_bsize * s.f_bfree
153             disk_free_for_nonroot = s.f_bsize * s.f_bavail
154
155             # include our local policy here: if we stop accepting shares when
156             # the available space drops below 1GB, then include that fact in
157             # disk_avail.
158             disk_avail = disk_free_for_nonroot - self.reserved_space
159             disk_avail = max(disk_avail, 0)
160             if self.readonly_storage:
161                 disk_avail = 0
162             if disk_avail == 0:
163                 writeable = False
164
165             # spacetime predictors should use disk_avail / (d(disk_used)/dt)
166             stats["storage_server.disk_total"] = disk_total
167             stats["storage_server.disk_used"] = disk_used
168             stats["storage_server.disk_free_for_root"] = disk_free_for_root
169             stats["storage_server.disk_free_for_nonroot"] = disk_free_for_nonroot
170             stats["storage_server.disk_avail"] = disk_avail
171         except AttributeError:
172             # os.statvfs is available only on unix
173             pass
174         stats["storage_server.accepting_immutable_shares"] = int(writeable)
175         s = self.bucket_counter.get_state()
176         bucket_count = s.get("last-complete-bucket-count")
177         if bucket_count:
178             stats["storage_server.total_bucket_count"] = bucket_count
179         return stats
180
181
182     def stat_disk(self, d):
183         s = os.statvfs(d)
184         # s.f_bavail: available to non-root users
185         disk_avail = s.f_bsize * s.f_bavail
186         return disk_avail
187
188     def get_available_space(self):
189         # returns None if it cannot be measured (windows)
190         try:
191             disk_avail = self.stat_disk(self.storedir)
192             disk_avail -= self.reserved_space
193         except AttributeError:
194             disk_avail = None
195         if self.readonly_storage:
196             disk_avail = 0
197         return disk_avail
198
199     def allocated_size(self):
200         space = 0
201         for bw in self._active_writers:
202             space += bw.allocated_size()
203         return space
204
205     def remote_get_version(self):
206         remaining_space = self.get_available_space()
207         if remaining_space is None:
208             # we're on a platform that doesn't have 'df', so make a vague
209             # guess.
210             remaining_space = 2**64
211         version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
212                     { "maximum-immutable-share-size": remaining_space,
213                       "tolerates-immutable-read-overrun": True,
214                       "delete-mutable-shares-with-zero-length-writev": True,
215                       },
216                     "application-version": str(allmydata.__full_version__),
217                     }
218         return version
219
220     def remote_allocate_buckets(self, storage_index,
221                                 renew_secret, cancel_secret,
222                                 sharenums, allocated_size,
223                                 canary, owner_num=0):
224         # owner_num is not for clients to set, but rather it should be
225         # curried into the PersonalStorageServer instance that is dedicated
226         # to a particular owner.
227         start = time.time()
228         self.count("allocate")
229         alreadygot = set()
230         bucketwriters = {} # k: shnum, v: BucketWriter
231         si_dir = storage_index_to_dir(storage_index)
232         si_s = si_b2a(storage_index)
233
234         log.msg("storage: allocate_buckets %s" % si_s)
235
236         # in this implementation, the lease information (including secrets)
237         # goes into the share files themselves. It could also be put into a
238         # separate database. Note that the lease should not be added until
239         # the BucketWriter has been closed.
240         expire_time = time.time() + 31*24*60*60
241         lease_info = LeaseInfo(owner_num,
242                                renew_secret, cancel_secret,
243                                expire_time, self.my_nodeid)
244
245         max_space_per_bucket = allocated_size
246
247         remaining_space = self.get_available_space()
248         limited = remaining_space is not None
249         if limited:
250             # this is a bit conservative, since some of this allocated_size()
251             # has already been written to disk, where it will show up in
252             # get_available_space.
253             remaining_space -= self.allocated_size()
254
255         # fill alreadygot with all shares that we have, not just the ones
256         # they asked about: this will save them a lot of work. Add or update
257         # leases for all of them: if they want us to hold shares for this
258         # file, they'll want us to hold leases for this file.
259         for (shnum, fn) in self._get_bucket_shares(storage_index):
260             alreadygot.add(shnum)
261             sf = ShareFile(fn)
262             sf.add_or_renew_lease(lease_info)
263
264         # self.readonly_storage causes remaining_space=0
265
266         for shnum in sharenums:
267             incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
268             finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
269             if os.path.exists(finalhome):
270                 # great! we already have it. easy.
271                 pass
272             elif os.path.exists(incominghome):
273                 # Note that we don't create BucketWriters for shnums that
274                 # have a partial share (in incoming/), so if a second upload
275                 # occurs while the first is still in progress, the second
276                 # uploader will use different storage servers.
277                 pass
278             elif (not limited) or (remaining_space >= max_space_per_bucket):
279                 # ok! we need to create the new share file.
280                 bw = BucketWriter(self, incominghome, finalhome,
281                                   max_space_per_bucket, lease_info, canary)
282                 if self.no_storage:
283                     bw.throw_out_all_data = True
284                 bucketwriters[shnum] = bw
285                 self._active_writers[bw] = 1
286                 if limited:
287                     remaining_space -= max_space_per_bucket
288             else:
289                 # bummer! not enough space to accept this bucket
290                 pass
291
292         if bucketwriters:
293             fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
294
295         self.add_latency("allocate", time.time() - start)
296         return alreadygot, bucketwriters
297
298     def _iter_share_files(self, storage_index):
299         for shnum, filename in self._get_bucket_shares(storage_index):
300             f = open(filename, 'rb')
301             header = f.read(32)
302             f.close()
303             if header[:32] == MutableShareFile.MAGIC:
304                 sf = MutableShareFile(filename, self)
305                 # note: if the share has been migrated, the renew_lease()
306                 # call will throw an exception, with information to help the
307                 # client update the lease.
308             elif header[:4] == struct.pack(">L", 1):
309                 sf = ShareFile(filename)
310             else:
311                 continue # non-sharefile
312             yield sf
313
314     def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
315                          owner_num=1):
316         start = time.time()
317         self.count("add-lease")
318         new_expire_time = time.time() + 31*24*60*60
319         lease_info = LeaseInfo(owner_num,
320                                renew_secret, cancel_secret,
321                                new_expire_time, self.my_nodeid)
322         for sf in self._iter_share_files(storage_index):
323             sf.add_or_renew_lease(lease_info)
324         self.add_latency("add-lease", time.time() - start)
325         return None
326
327     def remote_renew_lease(self, storage_index, renew_secret):
328         start = time.time()
329         self.count("renew")
330         new_expire_time = time.time() + 31*24*60*60
331         found_buckets = False
332         for sf in self._iter_share_files(storage_index):
333             found_buckets = True
334             sf.renew_lease(renew_secret, new_expire_time)
335         self.add_latency("renew", time.time() - start)
336         if not found_buckets:
337             raise IndexError("no such lease to renew")
338
339     def remote_cancel_lease(self, storage_index, cancel_secret):
340         start = time.time()
341         self.count("cancel")
342
343         total_space_freed = 0
344         found_buckets = False
345         for sf in self._iter_share_files(storage_index):
346             # note: if we can't find a lease on one share, we won't bother
347             # looking in the others. Unless something broke internally
348             # (perhaps we ran out of disk space while adding a lease), the
349             # leases on all shares will be identical.
350             found_buckets = True
351             # this raises IndexError if the lease wasn't present XXXX
352             total_space_freed += sf.cancel_lease(cancel_secret)
353
354         if found_buckets:
355             storagedir = os.path.join(self.sharedir,
356                                       storage_index_to_dir(storage_index))
357             if not os.listdir(storagedir):
358                 os.rmdir(storagedir)
359
360         if self.stats_provider:
361             self.stats_provider.count('storage_server.bytes_freed',
362                                       total_space_freed)
363         self.add_latency("cancel", time.time() - start)
364         if not found_buckets:
365             raise IndexError("no such storage index")
366
367     def bucket_writer_closed(self, bw, consumed_size):
368         if self.stats_provider:
369             self.stats_provider.count('storage_server.bytes_added', consumed_size)
370         del self._active_writers[bw]
371
372     def _get_bucket_shares(self, storage_index):
373         """Return a list of (shnum, pathname) tuples for files that hold
374         shares for this storage_index. In each tuple, 'shnum' will always be
375         the integer form of the last component of 'pathname'."""
376         storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
377         try:
378             for f in os.listdir(storagedir):
379                 if NUM_RE.match(f):
380                     filename = os.path.join(storagedir, f)
381                     yield (int(f), filename)
382         except OSError:
383             # Commonly caused by there being no buckets at all.
384             pass
385
386     def remote_get_buckets(self, storage_index):
387         start = time.time()
388         self.count("get")
389         si_s = si_b2a(storage_index)
390         log.msg("storage: get_buckets %s" % si_s)
391         bucketreaders = {} # k: sharenum, v: BucketReader
392         for shnum, filename in self._get_bucket_shares(storage_index):
393             bucketreaders[shnum] = BucketReader(self, filename,
394                                                 storage_index, shnum)
395         self.add_latency("get", time.time() - start)
396         return bucketreaders
397
398     def get_leases(self, storage_index):
399         """Provide an iterator that yields all of the leases attached to this
400         bucket. Each lease is returned as a tuple of (owner_num,
401         renew_secret, cancel_secret, expiration_time).
402
403         This method is not for client use.
404         """
405
406         # since all shares get the same lease data, we just grab the leases
407         # from the first share
408         try:
409             shnum, filename = self._get_bucket_shares(storage_index).next()
410             sf = ShareFile(filename)
411             return sf.iter_leases()
412         except StopIteration:
413             return iter([])
414
415     def remote_slot_testv_and_readv_and_writev(self, storage_index,
416                                                secrets,
417                                                test_and_write_vectors,
418                                                read_vector):
419         start = time.time()
420         self.count("writev")
421         si_s = si_b2a(storage_index)
422         lp = log.msg("storage: slot_writev %s" % si_s)
423         si_dir = storage_index_to_dir(storage_index)
424         (write_enabler, renew_secret, cancel_secret) = secrets
425         # shares exist if there is a file for them
426         bucketdir = os.path.join(self.sharedir, si_dir)
427         shares = {}
428         if os.path.isdir(bucketdir):
429             for sharenum_s in os.listdir(bucketdir):
430                 try:
431                     sharenum = int(sharenum_s)
432                 except ValueError:
433                     continue
434                 filename = os.path.join(bucketdir, sharenum_s)
435                 msf = MutableShareFile(filename, self)
436                 msf.check_write_enabler(write_enabler, si_s)
437                 shares[sharenum] = msf
438         # write_enabler is good for all existing shares.
439
440         # Now evaluate test vectors.
441         testv_is_good = True
442         for sharenum in test_and_write_vectors:
443             (testv, datav, new_length) = test_and_write_vectors[sharenum]
444             if sharenum in shares:
445                 if not shares[sharenum].check_testv(testv):
446                     self.log("testv failed: [%d]: %r" % (sharenum, testv))
447                     testv_is_good = False
448                     break
449             else:
450                 # compare the vectors against an empty share, in which all
451                 # reads return empty strings.
452                 if not EmptyShare().check_testv(testv):
453                     self.log("testv failed (empty): [%d] %r" % (sharenum,
454                                                                 testv))
455                     testv_is_good = False
456                     break
457
458         # now gather the read vectors, before we do any writes
459         read_data = {}
460         for sharenum, share in shares.items():
461             read_data[sharenum] = share.readv(read_vector)
462
463         ownerid = 1 # TODO
464         expire_time = time.time() + 31*24*60*60   # one month
465         lease_info = LeaseInfo(ownerid,
466                                renew_secret, cancel_secret,
467                                expire_time, self.my_nodeid)
468
469         if testv_is_good:
470             # now apply the write vectors
471             for sharenum in test_and_write_vectors:
472                 (testv, datav, new_length) = test_and_write_vectors[sharenum]
473                 if new_length == 0:
474                     if sharenum in shares:
475                         shares[sharenum].unlink()
476                 else:
477                     if sharenum not in shares:
478                         # allocate a new share
479                         allocated_size = 2000 # arbitrary, really
480                         share = self._allocate_slot_share(bucketdir, secrets,
481                                                           sharenum,
482                                                           allocated_size,
483                                                           owner_num=0)
484                         shares[sharenum] = share
485                     shares[sharenum].writev(datav, new_length)
486                     # and update the lease
487                     shares[sharenum].add_or_renew_lease(lease_info)
488
489             if new_length == 0:
490                 # delete empty bucket directories
491                 if not os.listdir(bucketdir):
492                     os.rmdir(bucketdir)
493
494
495         # all done
496         self.add_latency("writev", time.time() - start)
497         return (testv_is_good, read_data)
498
499     def _allocate_slot_share(self, bucketdir, secrets, sharenum,
500                              allocated_size, owner_num=0):
501         (write_enabler, renew_secret, cancel_secret) = secrets
502         my_nodeid = self.my_nodeid
503         fileutil.make_dirs(bucketdir)
504         filename = os.path.join(bucketdir, "%d" % sharenum)
505         share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
506                                          self)
507         return share
508
509     def remote_slot_readv(self, storage_index, shares, readv):
510         start = time.time()
511         self.count("readv")
512         si_s = si_b2a(storage_index)
513         lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
514                      facility="tahoe.storage", level=log.OPERATIONAL)
515         si_dir = storage_index_to_dir(storage_index)
516         # shares exist if there is a file for them
517         bucketdir = os.path.join(self.sharedir, si_dir)
518         if not os.path.isdir(bucketdir):
519             self.add_latency("readv", time.time() - start)
520             return {}
521         datavs = {}
522         for sharenum_s in os.listdir(bucketdir):
523             try:
524                 sharenum = int(sharenum_s)
525             except ValueError:
526                 continue
527             if sharenum in shares or not shares:
528                 filename = os.path.join(bucketdir, sharenum_s)
529                 msf = MutableShareFile(filename, self)
530                 datavs[sharenum] = msf.readv(readv)
531         log.msg("returning shares %s" % (datavs.keys(),),
532                 facility="tahoe.storage", level=log.NOISY, parent=lp)
533         self.add_latency("readv", time.time() - start)
534         return datavs
535
536     def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
537                                     reason):
538         fileutil.make_dirs(self.corruption_advisory_dir)
539         now = time_format.iso_utc(sep="T")
540         si_s = si_b2a(storage_index)
541         # windows can't handle colons in the filename
542         fn = os.path.join(self.corruption_advisory_dir,
543                           "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
544         f = open(fn, "w")
545         f.write("report: Share Corruption\n")
546         f.write("type: %s\n" % share_type)
547         f.write("storage_index: %s\n" % si_s)
548         f.write("share_number: %d\n" % shnum)
549         f.write("\n")
550         f.write(reason)
551         f.write("\n")
552         f.close()
553         log.msg(format=("client claims corruption in (%(share_type)s) " +
554                         "%(si)s-%(shnum)d: %(reason)s"),
555                 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
556                 level=log.SCARY, umid="SGx2fA")
557         return None
558