]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/server.py
storage: move si_b2a/si_a2b/storage_index_to_dir out of server.py and into common.py
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / server.py
1 import os, re, weakref, struct, time
2
3 from foolscap import Referenceable
4 from twisted.application import service
5
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import base32, fileutil, log, time_format
9 import allmydata # for __full_version__
10
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15      create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17
18 # storage/
19 # storage/shares/incoming
20 #   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
21 #   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
22 # storage/shares/$START/$STORAGEINDEX
23 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
24
25 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
26 # base-32 chars).
27
28 # $SHARENUM matches this regex:
29 NUM_RE=re.compile("^[0-9]+$")
30
31
32
33 class StorageServer(service.MultiService, Referenceable):
34     implements(RIStorageServer, IStatsProducer)
35     name = 'storage'
36
37     def __init__(self, storedir, nodeid, reserved_space=0,
38                  discard_storage=False, readonly_storage=False,
39                  stats_provider=None):
40         service.MultiService.__init__(self)
41         assert isinstance(nodeid, str)
42         assert len(nodeid) == 20
43         self.my_nodeid = nodeid
44         self.storedir = storedir
45         sharedir = os.path.join(storedir, "shares")
46         fileutil.make_dirs(sharedir)
47         self.sharedir = sharedir
48         # we don't actually create the corruption-advisory dir until necessary
49         self.corruption_advisory_dir = os.path.join(storedir,
50                                                     "corruption-advisories")
51         self.reserved_space = int(reserved_space)
52         self.no_storage = discard_storage
53         self.readonly_storage = readonly_storage
54         self.stats_provider = stats_provider
55         if self.stats_provider:
56             self.stats_provider.register_producer(self)
57         self.incomingdir = os.path.join(sharedir, 'incoming')
58         self._clean_incomplete()
59         fileutil.make_dirs(self.incomingdir)
60         self._active_writers = weakref.WeakKeyDictionary()
61         lp = log.msg("StorageServer created", facility="tahoe.storage")
62
63         if reserved_space:
64             if self.get_available_space() is None:
65                 log.msg("warning: [storage]reserved_space= is set, but this platform does not support statvfs(2), so this reservation cannot be honored",
66                         umin="0wZ27w", level=log.UNUSUAL)
67
68         self.latencies = {"allocate": [], # immutable
69                           "write": [],
70                           "close": [],
71                           "read": [],
72                           "get": [],
73                           "writev": [], # mutable
74                           "readv": [],
75                           "add-lease": [], # both
76                           "renew": [],
77                           "cancel": [],
78                           }
79
80     def count(self, name, delta=1):
81         if self.stats_provider:
82             self.stats_provider.count("storage_server." + name, delta)
83
84     def add_latency(self, category, latency):
85         a = self.latencies[category]
86         a.append(latency)
87         if len(a) > 1000:
88             self.latencies[category] = a[-1000:]
89
90     def get_latencies(self):
91         """Return a dict, indexed by category, that contains a dict of
92         latency numbers for each category. Each dict will contain the
93         following keys: mean, 01_0_percentile, 10_0_percentile,
94         50_0_percentile (median), 90_0_percentile, 95_0_percentile,
95         99_0_percentile, 99_9_percentile. If no samples have been collected
96         for the given category, then that category name will not be present
97         in the return value."""
98         # note that Amazon's Dynamo paper says they use 99.9% percentile.
99         output = {}
100         for category in self.latencies:
101             if not self.latencies[category]:
102                 continue
103             stats = {}
104             samples = self.latencies[category][:]
105             samples.sort()
106             count = len(samples)
107             stats["mean"] = sum(samples) / count
108             stats["01_0_percentile"] = samples[int(0.01 * count)]
109             stats["10_0_percentile"] = samples[int(0.1 * count)]
110             stats["50_0_percentile"] = samples[int(0.5 * count)]
111             stats["90_0_percentile"] = samples[int(0.9 * count)]
112             stats["95_0_percentile"] = samples[int(0.95 * count)]
113             stats["99_0_percentile"] = samples[int(0.99 * count)]
114             stats["99_9_percentile"] = samples[int(0.999 * count)]
115             output[category] = stats
116         return output
117
118     def log(self, *args, **kwargs):
119         if "facility" not in kwargs:
120             kwargs["facility"] = "tahoe.storage"
121         return log.msg(*args, **kwargs)
122
123     def _clean_incomplete(self):
124         fileutil.rm_dir(self.incomingdir)
125
126     def do_statvfs(self):
127         return os.statvfs(self.storedir)
128
129     def get_stats(self):
130         # remember: RIStatsProvider requires that our return dict
131         # contains numeric values.
132         stats = { 'storage_server.allocated': self.allocated_size(), }
133         stats["storage_server.reserved_space"] = self.reserved_space
134         for category,ld in self.get_latencies().items():
135             for name,v in ld.items():
136                 stats['storage_server.latencies.%s.%s' % (category, name)] = v
137         writeable = True
138         if self.readonly_storage:
139             writeable = False
140         try:
141             s = self.do_statvfs()
142             disk_total = s.f_bsize * s.f_blocks
143             disk_used = s.f_bsize * (s.f_blocks - s.f_bfree)
144             # spacetime predictors should look at the slope of disk_used.
145             disk_avail = s.f_bsize * s.f_bavail # available to non-root users
146             # include our local policy here: if we stop accepting shares when
147             # the available space drops below 1GB, then include that fact in
148             # disk_avail.
149             disk_avail -= self.reserved_space
150             disk_avail = max(disk_avail, 0)
151             if self.readonly_storage:
152                 disk_avail = 0
153             if disk_avail == 0:
154                 writeable = False
155
156             # spacetime predictors should use disk_avail / (d(disk_used)/dt)
157             stats["storage_server.disk_total"] = disk_total
158             stats["storage_server.disk_used"] = disk_used
159             stats["storage_server.disk_avail"] = disk_avail
160         except AttributeError:
161             # os.statvfs is available only on unix
162             pass
163         stats["storage_server.accepting_immutable_shares"] = int(writeable)
164         return stats
165
166
167     def stat_disk(self, d):
168         s = os.statvfs(d)
169         # s.f_bavail: available to non-root users
170         disk_avail = s.f_bsize * s.f_bavail
171         return disk_avail
172
173     def get_available_space(self):
174         # returns None if it cannot be measured (windows)
175         try:
176             disk_avail = self.stat_disk(self.storedir)
177             disk_avail -= self.reserved_space
178         except AttributeError:
179             disk_avail = None
180         if self.readonly_storage:
181             disk_avail = 0
182         return disk_avail
183
184     def allocated_size(self):
185         space = 0
186         for bw in self._active_writers:
187             space += bw.allocated_size()
188         return space
189
190     def remote_get_version(self):
191         remaining_space = self.get_available_space()
192         if remaining_space is None:
193             # we're on a platform that doesn't have 'df', so make a vague
194             # guess.
195             remaining_space = 2**64
196         version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
197                     { "maximum-immutable-share-size": remaining_space,
198                       "tolerates-immutable-read-overrun": True,
199                       "delete-mutable-shares-with-zero-length-writev": True,
200                       },
201                     "application-version": str(allmydata.__full_version__),
202                     }
203         return version
204
205     def remote_allocate_buckets(self, storage_index,
206                                 renew_secret, cancel_secret,
207                                 sharenums, allocated_size,
208                                 canary, owner_num=0):
209         # owner_num is not for clients to set, but rather it should be
210         # curried into the PersonalStorageServer instance that is dedicated
211         # to a particular owner.
212         start = time.time()
213         self.count("allocate")
214         alreadygot = set()
215         bucketwriters = {} # k: shnum, v: BucketWriter
216         si_dir = storage_index_to_dir(storage_index)
217         si_s = si_b2a(storage_index)
218
219         log.msg("storage: allocate_buckets %s" % si_s)
220
221         # in this implementation, the lease information (including secrets)
222         # goes into the share files themselves. It could also be put into a
223         # separate database. Note that the lease should not be added until
224         # the BucketWriter has been closed.
225         expire_time = time.time() + 31*24*60*60
226         lease_info = LeaseInfo(owner_num,
227                                renew_secret, cancel_secret,
228                                expire_time, self.my_nodeid)
229
230         max_space_per_bucket = allocated_size
231
232         remaining_space = self.get_available_space()
233         limited = remaining_space is not None
234         if limited:
235             # this is a bit conservative, since some of this allocated_size()
236             # has already been written to disk, where it will show up in
237             # get_available_space.
238             remaining_space -= self.allocated_size()
239
240         # fill alreadygot with all shares that we have, not just the ones
241         # they asked about: this will save them a lot of work. Add or update
242         # leases for all of them: if they want us to hold shares for this
243         # file, they'll want us to hold leases for this file.
244         for (shnum, fn) in self._get_bucket_shares(storage_index):
245             alreadygot.add(shnum)
246             sf = ShareFile(fn)
247             sf.add_or_renew_lease(lease_info)
248
249         # self.readonly_storage causes remaining_space=0
250
251         for shnum in sharenums:
252             incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
253             finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
254             if os.path.exists(finalhome):
255                 # great! we already have it. easy.
256                 pass
257             elif os.path.exists(incominghome):
258                 # Note that we don't create BucketWriters for shnums that
259                 # have a partial share (in incoming/), so if a second upload
260                 # occurs while the first is still in progress, the second
261                 # uploader will use different storage servers.
262                 pass
263             elif (not limited) or (remaining_space >= max_space_per_bucket):
264                 # ok! we need to create the new share file.
265                 bw = BucketWriter(self, incominghome, finalhome,
266                                   max_space_per_bucket, lease_info, canary)
267                 if self.no_storage:
268                     bw.throw_out_all_data = True
269                 bucketwriters[shnum] = bw
270                 self._active_writers[bw] = 1
271                 if limited:
272                     remaining_space -= max_space_per_bucket
273             else:
274                 # bummer! not enough space to accept this bucket
275                 pass
276
277         if bucketwriters:
278             fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
279
280         self.add_latency("allocate", time.time() - start)
281         return alreadygot, bucketwriters
282
283     def _iter_share_files(self, storage_index):
284         for shnum, filename in self._get_bucket_shares(storage_index):
285             f = open(filename, 'rb')
286             header = f.read(32)
287             f.close()
288             if header[:32] == MutableShareFile.MAGIC:
289                 sf = MutableShareFile(filename, self)
290                 # note: if the share has been migrated, the renew_lease()
291                 # call will throw an exception, with information to help the
292                 # client update the lease.
293             elif header[:4] == struct.pack(">L", 1):
294                 sf = ShareFile(filename)
295             else:
296                 continue # non-sharefile
297             yield sf
298
299     def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
300                          owner_num=1):
301         start = time.time()
302         self.count("add-lease")
303         new_expire_time = time.time() + 31*24*60*60
304         lease_info = LeaseInfo(owner_num,
305                                renew_secret, cancel_secret,
306                                new_expire_time, self.my_nodeid)
307         for sf in self._iter_share_files(storage_index):
308             sf.add_or_renew_lease(lease_info)
309         self.add_latency("add-lease", time.time() - start)
310         return None
311
312     def remote_renew_lease(self, storage_index, renew_secret):
313         start = time.time()
314         self.count("renew")
315         new_expire_time = time.time() + 31*24*60*60
316         found_buckets = False
317         for sf in self._iter_share_files(storage_index):
318             found_buckets = True
319             sf.renew_lease(renew_secret, new_expire_time)
320         self.add_latency("renew", time.time() - start)
321         if not found_buckets:
322             raise IndexError("no such lease to renew")
323
324     def remote_cancel_lease(self, storage_index, cancel_secret):
325         start = time.time()
326         self.count("cancel")
327
328         total_space_freed = 0
329         found_buckets = False
330         for sf in self._iter_share_files(storage_index):
331             # note: if we can't find a lease on one share, we won't bother
332             # looking in the others. Unless something broke internally
333             # (perhaps we ran out of disk space while adding a lease), the
334             # leases on all shares will be identical.
335             found_buckets = True
336             # this raises IndexError if the lease wasn't present XXXX
337             total_space_freed += sf.cancel_lease(cancel_secret)
338
339         if found_buckets:
340             storagedir = os.path.join(self.sharedir,
341                                       storage_index_to_dir(storage_index))
342             if not os.listdir(storagedir):
343                 os.rmdir(storagedir)
344
345         if self.stats_provider:
346             self.stats_provider.count('storage_server.bytes_freed',
347                                       total_space_freed)
348         self.add_latency("cancel", time.time() - start)
349         if not found_buckets:
350             raise IndexError("no such storage index")
351
352     def bucket_writer_closed(self, bw, consumed_size):
353         if self.stats_provider:
354             self.stats_provider.count('storage_server.bytes_added', consumed_size)
355         del self._active_writers[bw]
356
357     def _get_bucket_shares(self, storage_index):
358         """Return a list of (shnum, pathname) tuples for files that hold
359         shares for this storage_index. In each tuple, 'shnum' will always be
360         the integer form of the last component of 'pathname'."""
361         storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
362         try:
363             for f in os.listdir(storagedir):
364                 if NUM_RE.match(f):
365                     filename = os.path.join(storagedir, f)
366                     yield (int(f), filename)
367         except OSError:
368             # Commonly caused by there being no buckets at all.
369             pass
370
371     def remote_get_buckets(self, storage_index):
372         start = time.time()
373         self.count("get")
374         si_s = si_b2a(storage_index)
375         log.msg("storage: get_buckets %s" % si_s)
376         bucketreaders = {} # k: sharenum, v: BucketReader
377         for shnum, filename in self._get_bucket_shares(storage_index):
378             bucketreaders[shnum] = BucketReader(self, filename,
379                                                 storage_index, shnum)
380         self.add_latency("get", time.time() - start)
381         return bucketreaders
382
383     def get_leases(self, storage_index):
384         """Provide an iterator that yields all of the leases attached to this
385         bucket. Each lease is returned as a tuple of (owner_num,
386         renew_secret, cancel_secret, expiration_time).
387
388         This method is not for client use.
389         """
390
391         # since all shares get the same lease data, we just grab the leases
392         # from the first share
393         try:
394             shnum, filename = self._get_bucket_shares(storage_index).next()
395             sf = ShareFile(filename)
396             return sf.iter_leases()
397         except StopIteration:
398             return iter([])
399
400     def remote_slot_testv_and_readv_and_writev(self, storage_index,
401                                                secrets,
402                                                test_and_write_vectors,
403                                                read_vector):
404         start = time.time()
405         self.count("writev")
406         si_s = si_b2a(storage_index)
407         lp = log.msg("storage: slot_writev %s" % si_s)
408         si_dir = storage_index_to_dir(storage_index)
409         (write_enabler, renew_secret, cancel_secret) = secrets
410         # shares exist if there is a file for them
411         bucketdir = os.path.join(self.sharedir, si_dir)
412         shares = {}
413         if os.path.isdir(bucketdir):
414             for sharenum_s in os.listdir(bucketdir):
415                 try:
416                     sharenum = int(sharenum_s)
417                 except ValueError:
418                     continue
419                 filename = os.path.join(bucketdir, sharenum_s)
420                 msf = MutableShareFile(filename, self)
421                 msf.check_write_enabler(write_enabler, si_s)
422                 shares[sharenum] = msf
423         # write_enabler is good for all existing shares.
424
425         # Now evaluate test vectors.
426         testv_is_good = True
427         for sharenum in test_and_write_vectors:
428             (testv, datav, new_length) = test_and_write_vectors[sharenum]
429             if sharenum in shares:
430                 if not shares[sharenum].check_testv(testv):
431                     self.log("testv failed: [%d]: %r" % (sharenum, testv))
432                     testv_is_good = False
433                     break
434             else:
435                 # compare the vectors against an empty share, in which all
436                 # reads return empty strings.
437                 if not EmptyShare().check_testv(testv):
438                     self.log("testv failed (empty): [%d] %r" % (sharenum,
439                                                                 testv))
440                     testv_is_good = False
441                     break
442
443         # now gather the read vectors, before we do any writes
444         read_data = {}
445         for sharenum, share in shares.items():
446             read_data[sharenum] = share.readv(read_vector)
447
448         ownerid = 1 # TODO
449         expire_time = time.time() + 31*24*60*60   # one month
450         lease_info = LeaseInfo(ownerid,
451                                renew_secret, cancel_secret,
452                                expire_time, self.my_nodeid)
453
454         if testv_is_good:
455             # now apply the write vectors
456             for sharenum in test_and_write_vectors:
457                 (testv, datav, new_length) = test_and_write_vectors[sharenum]
458                 if new_length == 0:
459                     if sharenum in shares:
460                         shares[sharenum].unlink()
461                 else:
462                     if sharenum not in shares:
463                         # allocate a new share
464                         allocated_size = 2000 # arbitrary, really
465                         share = self._allocate_slot_share(bucketdir, secrets,
466                                                           sharenum,
467                                                           allocated_size,
468                                                           owner_num=0)
469                         shares[sharenum] = share
470                     shares[sharenum].writev(datav, new_length)
471                     # and update the lease
472                     shares[sharenum].add_or_renew_lease(lease_info)
473
474             if new_length == 0:
475                 # delete empty bucket directories
476                 if not os.listdir(bucketdir):
477                     os.rmdir(bucketdir)
478
479
480         # all done
481         self.add_latency("writev", time.time() - start)
482         return (testv_is_good, read_data)
483
484     def _allocate_slot_share(self, bucketdir, secrets, sharenum,
485                              allocated_size, owner_num=0):
486         (write_enabler, renew_secret, cancel_secret) = secrets
487         my_nodeid = self.my_nodeid
488         fileutil.make_dirs(bucketdir)
489         filename = os.path.join(bucketdir, "%d" % sharenum)
490         share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
491                                          self)
492         return share
493
494     def remote_slot_readv(self, storage_index, shares, readv):
495         start = time.time()
496         self.count("readv")
497         si_s = si_b2a(storage_index)
498         lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
499                      facility="tahoe.storage", level=log.OPERATIONAL)
500         si_dir = storage_index_to_dir(storage_index)
501         # shares exist if there is a file for them
502         bucketdir = os.path.join(self.sharedir, si_dir)
503         if not os.path.isdir(bucketdir):
504             self.add_latency("readv", time.time() - start)
505             return {}
506         datavs = {}
507         for sharenum_s in os.listdir(bucketdir):
508             try:
509                 sharenum = int(sharenum_s)
510             except ValueError:
511                 continue
512             if sharenum in shares or not shares:
513                 filename = os.path.join(bucketdir, sharenum_s)
514                 msf = MutableShareFile(filename, self)
515                 datavs[sharenum] = msf.readv(readv)
516         log.msg("returning shares %s" % (datavs.keys(),),
517                 facility="tahoe.storage", level=log.NOISY, parent=lp)
518         self.add_latency("readv", time.time() - start)
519         return datavs
520
521     def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
522                                     reason):
523         fileutil.make_dirs(self.corruption_advisory_dir)
524         now = time_format.iso_utc(sep="T")
525         si_s = base32.b2a(storage_index)
526         # windows can't handle colons in the filename
527         fn = os.path.join(self.corruption_advisory_dir,
528                           "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
529         f = open(fn, "w")
530         f.write("report: Share Corruption\n")
531         f.write("type: %s\n" % share_type)
532         f.write("storage_index: %s\n" % si_s)
533         f.write("share_number: %d\n" % shnum)
534         f.write("\n")
535         f.write(reason)
536         f.write("\n")
537         f.close()
538         log.msg(format=("client claims corruption in (%(share_type)s) " +
539                         "%(si)s-%(shnum)d: %(reason)s"),
540                 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
541                 level=log.SCARY, umid="SGx2fA")
542         return None
543