]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/server.py
1af9c8901d1a940b18b734b77d895beebd779b3c
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / server.py
1 import os, re, weakref, struct, time
2
3 from foolscap.api import Referenceable
4 from twisted.application import service
5
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, IStatsProducer
8 from allmydata.util import fileutil, idlib, log, time_format
9 import allmydata # for __full_version__
10
11 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13 from allmydata.storage.lease import LeaseInfo
14 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15      create_mutable_sharefile
16 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17 from allmydata.storage.crawler import BucketCountingCrawler
18 from allmydata.storage.expirer import LeaseCheckingCrawler
19
20 # storage/
21 # storage/shares/incoming
22 #   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23 #   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24 # storage/shares/$START/$STORAGEINDEX
25 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
26
27 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
28 # base-32 chars).
29
30 # $SHARENUM matches this regex:
31 NUM_RE=re.compile("^[0-9]+$")
32
33
34
35 class StorageServer(service.MultiService, Referenceable):
36     implements(RIStorageServer, IStatsProducer)
37     name = 'storage'
38     LeaseCheckerClass = LeaseCheckingCrawler
39     windows = False
40
41     try:
42         import win32api, win32con
43         windows = True
44         # <http://msdn.microsoft.com/en-us/library/ms680621%28VS.85%29.aspx>
45         win32api.SetErrorMode(win32con.SEM_FAILCRITICALERRORS |
46                               win32con.SEM_NOOPENFILEERRORBOX)
47     except ImportError:
48         pass
49
50     def __init__(self, storedir, nodeid, reserved_space=0,
51                  discard_storage=False, readonly_storage=False,
52                  stats_provider=None,
53                  expiration_enabled=False,
54                  expiration_mode="age",
55                  expiration_override_lease_duration=None,
56                  expiration_cutoff_date=None,
57                  expiration_sharetypes=("mutable", "immutable")):
58         service.MultiService.__init__(self)
59         assert isinstance(nodeid, str)
60         assert len(nodeid) == 20
61         self.my_nodeid = nodeid
62         self.storedir = storedir
63         sharedir = os.path.join(storedir, "shares")
64         fileutil.make_dirs(sharedir)
65         self.sharedir = sharedir
66         # we don't actually create the corruption-advisory dir until necessary
67         self.corruption_advisory_dir = os.path.join(storedir,
68                                                     "corruption-advisories")
69         self.reserved_space = int(reserved_space)
70         self.no_storage = discard_storage
71         self.readonly_storage = readonly_storage
72         self.stats_provider = stats_provider
73         if self.stats_provider:
74             self.stats_provider.register_producer(self)
75         self.incomingdir = os.path.join(sharedir, 'incoming')
76         self._clean_incomplete()
77         fileutil.make_dirs(self.incomingdir)
78         self._active_writers = weakref.WeakKeyDictionary()
79         log.msg("StorageServer created", facility="tahoe.storage")
80
81         if reserved_space:
82             if self.get_available_space() is None:
83                 log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
84                         umin="0wZ27w", level=log.UNUSUAL)
85
86         self.latencies = {"allocate": [], # immutable
87                           "write": [],
88                           "close": [],
89                           "read": [],
90                           "get": [],
91                           "writev": [], # mutable
92                           "readv": [],
93                           "add-lease": [], # both
94                           "renew": [],
95                           "cancel": [],
96                           }
97         self.add_bucket_counter()
98
99         statefile = os.path.join(self.storedir, "lease_checker.state")
100         historyfile = os.path.join(self.storedir, "lease_checker.history")
101         klass = self.LeaseCheckerClass
102         self.lease_checker = klass(self, statefile, historyfile,
103                                    expiration_enabled, expiration_mode,
104                                    expiration_override_lease_duration,
105                                    expiration_cutoff_date,
106                                    expiration_sharetypes)
107         self.lease_checker.setServiceParent(self)
108
109     def __repr__(self):
110         return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
111
112     def add_bucket_counter(self):
113         statefile = os.path.join(self.storedir, "bucket_counter.state")
114         self.bucket_counter = BucketCountingCrawler(self, statefile)
115         self.bucket_counter.setServiceParent(self)
116
117     def count(self, name, delta=1):
118         if self.stats_provider:
119             self.stats_provider.count("storage_server." + name, delta)
120
121     def add_latency(self, category, latency):
122         a = self.latencies[category]
123         a.append(latency)
124         if len(a) > 1000:
125             self.latencies[category] = a[-1000:]
126
127     def get_latencies(self):
128         """Return a dict, indexed by category, that contains a dict of
129         latency numbers for each category. Each dict will contain the
130         following keys: mean, 01_0_percentile, 10_0_percentile,
131         50_0_percentile (median), 90_0_percentile, 95_0_percentile,
132         99_0_percentile, 99_9_percentile. If no samples have been collected
133         for the given category, then that category name will not be present
134         in the return value."""
135         # note that Amazon's Dynamo paper says they use 99.9% percentile.
136         output = {}
137         for category in self.latencies:
138             if not self.latencies[category]:
139                 continue
140             stats = {}
141             samples = self.latencies[category][:]
142             samples.sort()
143             count = len(samples)
144             stats["mean"] = sum(samples) / count
145             stats["01_0_percentile"] = samples[int(0.01 * count)]
146             stats["10_0_percentile"] = samples[int(0.1 * count)]
147             stats["50_0_percentile"] = samples[int(0.5 * count)]
148             stats["90_0_percentile"] = samples[int(0.9 * count)]
149             stats["95_0_percentile"] = samples[int(0.95 * count)]
150             stats["99_0_percentile"] = samples[int(0.99 * count)]
151             stats["99_9_percentile"] = samples[int(0.999 * count)]
152             output[category] = stats
153         return output
154
155     def log(self, *args, **kwargs):
156         if "facility" not in kwargs:
157             kwargs["facility"] = "tahoe.storage"
158         return log.msg(*args, **kwargs)
159
160     def _clean_incomplete(self):
161         fileutil.rm_dir(self.incomingdir)
162
163     def get_disk_stats(self):
164         """Return disk statistics for the storage disk, in the form of a dict
165         with the following fields.
166           total:            total bytes on disk
167           free_for_root:    bytes actually free on disk
168           free_for_nonroot: bytes free for "a non-privileged user" [Unix] or
169                               the current user [Windows]; might take into
170                               account quotas depending on platform
171           used:             bytes used on disk
172           avail:            bytes available excluding reserved space
173         An AttributeError can occur if the OS has no API to get disk information.
174         An EnvironmentError can occur if the OS call fails."""
175
176         if self.windows:
177             # For Windows systems, where os.statvfs is not available, use GetDiskFreeSpaceEx.
178             # <http://docs.activestate.com/activepython/2.5/pywin32/win32api__GetDiskFreeSpaceEx_meth.html>
179             #
180             # Although the docs say that the argument should be the root directory
181             # of a disk, GetDiskFreeSpaceEx actually accepts any path on that disk
182             # (like its Win32 equivalent).
183
184             (free_for_nonroot, total, free_for_root) = self.win32api.GetDiskFreeSpaceEx(self.storedir)
185         else:
186             # For Unix-like systems.
187             # <http://docs.python.org/library/os.html#os.statvfs>
188             # <http://opengroup.org/onlinepubs/7990989799/xsh/fstatvfs.html>
189             # <http://opengroup.org/onlinepubs/7990989799/xsh/sysstatvfs.h.html>
190             s = os.statvfs(self.storedir)
191
192             # on my mac laptop:
193             #  statvfs(2) is a wrapper around statfs(2).
194             #    statvfs.f_frsize = statfs.f_bsize :
195             #     "minimum unit of allocation" (statvfs)
196             #     "fundamental file system block size" (statfs)
197             #    statvfs.f_bsize = statfs.f_iosize = stat.st_blocks : preferred IO size
198             # on an encrypted home directory ("FileVault"), it gets f_blocks
199             # wrong, and s.f_blocks*s.f_frsize is twice the size of my disk,
200             # but s.f_bavail*s.f_frsize is correct
201
202             total = s.f_frsize * s.f_blocks
203             free_for_root = s.f_frsize * s.f_bfree
204             free_for_nonroot = s.f_frsize * s.f_bavail
205
206         # valid for all platforms:
207         used = total - free_for_root
208         avail = max(free_for_nonroot - self.reserved_space, 0)
209
210         return { 'total': total, 'free_for_root': free_for_root,
211                  'free_for_nonroot': free_for_nonroot,
212                  'used': used, 'avail': avail, }
213
214     def get_stats(self):
215         # remember: RIStatsProvider requires that our return dict
216         # contains numeric values.
217         stats = { 'storage_server.allocated': self.allocated_size(), }
218         stats['storage_server.reserved_space'] = self.reserved_space
219         for category,ld in self.get_latencies().items():
220             for name,v in ld.items():
221                 stats['storage_server.latencies.%s.%s' % (category, name)] = v
222
223         try:
224             disk = self.get_disk_stats()
225             writeable = disk['avail'] > 0
226
227             # spacetime predictors should use disk_avail / (d(disk_used)/dt)
228             stats['storage_server.disk_total'] = disk['total']
229             stats['storage_server.disk_used'] = disk['used']
230             stats['storage_server.disk_free_for_root'] = disk['free_for_root']
231             stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
232             stats['storage_server.disk_avail'] = disk['avail']
233         except AttributeError:
234             writeable = True
235         except EnvironmentError:
236             log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
237             writeable = False
238
239         if self.readonly_storage:
240             stats['storage_server.disk_avail'] = 0
241             writeable = False
242
243         stats['storage_server.accepting_immutable_shares'] = int(writeable)
244         s = self.bucket_counter.get_state()
245         bucket_count = s.get("last-complete-bucket-count")
246         if bucket_count:
247             stats['storage_server.total_bucket_count'] = bucket_count
248         return stats
249
250     def get_available_space(self):
251         """Returns available space for share storage in bytes, or None if no
252         API to get this information is available."""
253
254         if self.readonly_storage:
255             return 0
256         try:
257             return self.get_disk_stats()['avail']
258         except AttributeError:
259             return None
260         except EnvironmentError:
261             log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
262             return 0
263
264     def allocated_size(self):
265         space = 0
266         for bw in self._active_writers:
267             space += bw.allocated_size()
268         return space
269
270     def remote_get_version(self):
271         remaining_space = self.get_available_space()
272         if remaining_space is None:
273             # We're on a platform that has no API to get disk stats.
274             remaining_space = 2**64
275
276         version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
277                     { "maximum-immutable-share-size": remaining_space,
278                       "tolerates-immutable-read-overrun": True,
279                       "delete-mutable-shares-with-zero-length-writev": True,
280                       },
281                     "application-version": str(allmydata.__full_version__),
282                     }
283         return version
284
285     def remote_allocate_buckets(self, storage_index,
286                                 renew_secret, cancel_secret,
287                                 sharenums, allocated_size,
288                                 canary, owner_num=0):
289         # owner_num is not for clients to set, but rather it should be
290         # curried into the PersonalStorageServer instance that is dedicated
291         # to a particular owner.
292         start = time.time()
293         self.count("allocate")
294         alreadygot = set()
295         bucketwriters = {} # k: shnum, v: BucketWriter
296         si_dir = storage_index_to_dir(storage_index)
297         si_s = si_b2a(storage_index)
298
299         log.msg("storage: allocate_buckets %s" % si_s)
300
301         # in this implementation, the lease information (including secrets)
302         # goes into the share files themselves. It could also be put into a
303         # separate database. Note that the lease should not be added until
304         # the BucketWriter has been closed.
305         expire_time = time.time() + 31*24*60*60
306         lease_info = LeaseInfo(owner_num,
307                                renew_secret, cancel_secret,
308                                expire_time, self.my_nodeid)
309
310         max_space_per_bucket = allocated_size
311
312         remaining_space = self.get_available_space()
313         limited = remaining_space is not None
314         if limited:
315             # this is a bit conservative, since some of this allocated_size()
316             # has already been written to disk, where it will show up in
317             # get_available_space.
318             remaining_space -= self.allocated_size()
319         # self.readonly_storage causes remaining_space <= 0
320
321         # fill alreadygot with all shares that we have, not just the ones
322         # they asked about: this will save them a lot of work. Add or update
323         # leases for all of them: if they want us to hold shares for this
324         # file, they'll want us to hold leases for this file.
325         for (shnum, fn) in self._get_bucket_shares(storage_index):
326             alreadygot.add(shnum)
327             sf = ShareFile(fn)
328             sf.add_or_renew_lease(lease_info)
329
330         for shnum in sharenums:
331             incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
332             finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
333             if os.path.exists(finalhome):
334                 # great! we already have it. easy.
335                 pass
336             elif os.path.exists(incominghome):
337                 # Note that we don't create BucketWriters for shnums that
338                 # have a partial share (in incoming/), so if a second upload
339                 # occurs while the first is still in progress, the second
340                 # uploader will use different storage servers.
341                 pass
342             elif (not limited) or (remaining_space >= max_space_per_bucket):
343                 # ok! we need to create the new share file.
344                 bw = BucketWriter(self, incominghome, finalhome,
345                                   max_space_per_bucket, lease_info, canary)
346                 if self.no_storage:
347                     bw.throw_out_all_data = True
348                 bucketwriters[shnum] = bw
349                 self._active_writers[bw] = 1
350                 if limited:
351                     remaining_space -= max_space_per_bucket
352             else:
353                 # bummer! not enough space to accept this bucket
354                 pass
355
356         if bucketwriters:
357             fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
358
359         self.add_latency("allocate", time.time() - start)
360         return alreadygot, bucketwriters
361
362     def _iter_share_files(self, storage_index):
363         for shnum, filename in self._get_bucket_shares(storage_index):
364             f = open(filename, 'rb')
365             header = f.read(32)
366             f.close()
367             if header[:32] == MutableShareFile.MAGIC:
368                 sf = MutableShareFile(filename, self)
369                 # note: if the share has been migrated, the renew_lease()
370                 # call will throw an exception, with information to help the
371                 # client update the lease.
372             elif header[:4] == struct.pack(">L", 1):
373                 sf = ShareFile(filename)
374             else:
375                 continue # non-sharefile
376             yield sf
377
378     def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
379                          owner_num=1):
380         start = time.time()
381         self.count("add-lease")
382         new_expire_time = time.time() + 31*24*60*60
383         lease_info = LeaseInfo(owner_num,
384                                renew_secret, cancel_secret,
385                                new_expire_time, self.my_nodeid)
386         for sf in self._iter_share_files(storage_index):
387             sf.add_or_renew_lease(lease_info)
388         self.add_latency("add-lease", time.time() - start)
389         return None
390
391     def remote_renew_lease(self, storage_index, renew_secret):
392         start = time.time()
393         self.count("renew")
394         new_expire_time = time.time() + 31*24*60*60
395         found_buckets = False
396         for sf in self._iter_share_files(storage_index):
397             found_buckets = True
398             sf.renew_lease(renew_secret, new_expire_time)
399         self.add_latency("renew", time.time() - start)
400         if not found_buckets:
401             raise IndexError("no such lease to renew")
402
403     def remote_cancel_lease(self, storage_index, cancel_secret):
404         start = time.time()
405         self.count("cancel")
406
407         total_space_freed = 0
408         found_buckets = False
409         for sf in self._iter_share_files(storage_index):
410             # note: if we can't find a lease on one share, we won't bother
411             # looking in the others. Unless something broke internally
412             # (perhaps we ran out of disk space while adding a lease), the
413             # leases on all shares will be identical.
414             found_buckets = True
415             # this raises IndexError if the lease wasn't present XXXX
416             total_space_freed += sf.cancel_lease(cancel_secret)
417
418         if found_buckets:
419             storagedir = os.path.join(self.sharedir,
420                                       storage_index_to_dir(storage_index))
421             if not os.listdir(storagedir):
422                 os.rmdir(storagedir)
423
424         if self.stats_provider:
425             self.stats_provider.count('storage_server.bytes_freed',
426                                       total_space_freed)
427         self.add_latency("cancel", time.time() - start)
428         if not found_buckets:
429             raise IndexError("no such storage index")
430
431     def bucket_writer_closed(self, bw, consumed_size):
432         if self.stats_provider:
433             self.stats_provider.count('storage_server.bytes_added', consumed_size)
434         del self._active_writers[bw]
435
436     def _get_bucket_shares(self, storage_index):
437         """Return a list of (shnum, pathname) tuples for files that hold
438         shares for this storage_index. In each tuple, 'shnum' will always be
439         the integer form of the last component of 'pathname'."""
440         storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
441         try:
442             for f in os.listdir(storagedir):
443                 if NUM_RE.match(f):
444                     filename = os.path.join(storagedir, f)
445                     yield (int(f), filename)
446         except OSError:
447             # Commonly caused by there being no buckets at all.
448             pass
449
450     def remote_get_buckets(self, storage_index):
451         start = time.time()
452         self.count("get")
453         si_s = si_b2a(storage_index)
454         log.msg("storage: get_buckets %s" % si_s)
455         bucketreaders = {} # k: sharenum, v: BucketReader
456         for shnum, filename in self._get_bucket_shares(storage_index):
457             bucketreaders[shnum] = BucketReader(self, filename,
458                                                 storage_index, shnum)
459         self.add_latency("get", time.time() - start)
460         return bucketreaders
461
462     def get_leases(self, storage_index):
463         """Provide an iterator that yields all of the leases attached to this
464         bucket. Each lease is returned as a LeaseInfo instance.
465
466         This method is not for client use.
467         """
468
469         # since all shares get the same lease data, we just grab the leases
470         # from the first share
471         try:
472             shnum, filename = self._get_bucket_shares(storage_index).next()
473             sf = ShareFile(filename)
474             return sf.get_leases()
475         except StopIteration:
476             return iter([])
477
478     def remote_slot_testv_and_readv_and_writev(self, storage_index,
479                                                secrets,
480                                                test_and_write_vectors,
481                                                read_vector):
482         start = time.time()
483         self.count("writev")
484         si_s = si_b2a(storage_index)
485         log.msg("storage: slot_writev %s" % si_s)
486         si_dir = storage_index_to_dir(storage_index)
487         (write_enabler, renew_secret, cancel_secret) = secrets
488         # shares exist if there is a file for them
489         bucketdir = os.path.join(self.sharedir, si_dir)
490         shares = {}
491         if os.path.isdir(bucketdir):
492             for sharenum_s in os.listdir(bucketdir):
493                 try:
494                     sharenum = int(sharenum_s)
495                 except ValueError:
496                     continue
497                 filename = os.path.join(bucketdir, sharenum_s)
498                 msf = MutableShareFile(filename, self)
499                 msf.check_write_enabler(write_enabler, si_s)
500                 shares[sharenum] = msf
501         # write_enabler is good for all existing shares.
502
503         # Now evaluate test vectors.
504         testv_is_good = True
505         for sharenum in test_and_write_vectors:
506             (testv, datav, new_length) = test_and_write_vectors[sharenum]
507             if sharenum in shares:
508                 if not shares[sharenum].check_testv(testv):
509                     self.log("testv failed: [%d]: %r" % (sharenum, testv))
510                     testv_is_good = False
511                     break
512             else:
513                 # compare the vectors against an empty share, in which all
514                 # reads return empty strings.
515                 if not EmptyShare().check_testv(testv):
516                     self.log("testv failed (empty): [%d] %r" % (sharenum,
517                                                                 testv))
518                     testv_is_good = False
519                     break
520
521         # now gather the read vectors, before we do any writes
522         read_data = {}
523         for sharenum, share in shares.items():
524             read_data[sharenum] = share.readv(read_vector)
525
526         ownerid = 1 # TODO
527         expire_time = time.time() + 31*24*60*60   # one month
528         lease_info = LeaseInfo(ownerid,
529                                renew_secret, cancel_secret,
530                                expire_time, self.my_nodeid)
531
532         if testv_is_good:
533             # now apply the write vectors
534             for sharenum in test_and_write_vectors:
535                 (testv, datav, new_length) = test_and_write_vectors[sharenum]
536                 if new_length == 0:
537                     if sharenum in shares:
538                         shares[sharenum].unlink()
539                 else:
540                     if sharenum not in shares:
541                         # allocate a new share
542                         allocated_size = 2000 # arbitrary, really
543                         share = self._allocate_slot_share(bucketdir, secrets,
544                                                           sharenum,
545                                                           allocated_size,
546                                                           owner_num=0)
547                         shares[sharenum] = share
548                     shares[sharenum].writev(datav, new_length)
549                     # and update the lease
550                     shares[sharenum].add_or_renew_lease(lease_info)
551
552             if new_length == 0:
553                 # delete empty bucket directories
554                 if not os.listdir(bucketdir):
555                     os.rmdir(bucketdir)
556
557
558         # all done
559         self.add_latency("writev", time.time() - start)
560         return (testv_is_good, read_data)
561
562     def _allocate_slot_share(self, bucketdir, secrets, sharenum,
563                              allocated_size, owner_num=0):
564         (write_enabler, renew_secret, cancel_secret) = secrets
565         my_nodeid = self.my_nodeid
566         fileutil.make_dirs(bucketdir)
567         filename = os.path.join(bucketdir, "%d" % sharenum)
568         share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
569                                          self)
570         return share
571
572     def remote_slot_readv(self, storage_index, shares, readv):
573         start = time.time()
574         self.count("readv")
575         si_s = si_b2a(storage_index)
576         lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
577                      facility="tahoe.storage", level=log.OPERATIONAL)
578         si_dir = storage_index_to_dir(storage_index)
579         # shares exist if there is a file for them
580         bucketdir = os.path.join(self.sharedir, si_dir)
581         if not os.path.isdir(bucketdir):
582             self.add_latency("readv", time.time() - start)
583             return {}
584         datavs = {}
585         for sharenum_s in os.listdir(bucketdir):
586             try:
587                 sharenum = int(sharenum_s)
588             except ValueError:
589                 continue
590             if sharenum in shares or not shares:
591                 filename = os.path.join(bucketdir, sharenum_s)
592                 msf = MutableShareFile(filename, self)
593                 datavs[sharenum] = msf.readv(readv)
594         log.msg("returning shares %s" % (datavs.keys(),),
595                 facility="tahoe.storage", level=log.NOISY, parent=lp)
596         self.add_latency("readv", time.time() - start)
597         return datavs
598
599     def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
600                                     reason):
601         fileutil.make_dirs(self.corruption_advisory_dir)
602         now = time_format.iso_utc(sep="T")
603         si_s = si_b2a(storage_index)
604         # windows can't handle colons in the filename
605         fn = os.path.join(self.corruption_advisory_dir,
606                           "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
607         f = open(fn, "w")
608         f.write("report: Share Corruption\n")
609         f.write("type: %s\n" % share_type)
610         f.write("storage_index: %s\n" % si_s)
611         f.write("share_number: %d\n" % shnum)
612         f.write("\n")
613         f.write(reason)
614         f.write("\n")
615         f.close()
616         log.msg(format=("client claims corruption in (%(share_type)s) " +
617                         "%(si)s-%(shnum)d: %(reason)s"),
618                 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
619                 level=log.SCARY, umid="SGx2fA")
620         return None
621