]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage.py
e2fbb4f82a2d6eacdd65ee6c95d53a2fc7623429
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage.py
1 import os, re, weakref, stat, struct, time
2
3 from foolscap import Referenceable
4 from twisted.application import service
5
6 from zope.interface import implements
7 from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
8      RIBucketReader, BadWriteEnablerError, IStatsProducer
9 from allmydata.util import base32, fileutil, idlib, log, time_format
10 from allmydata.util.assertutil import precondition
11 import allmydata # for __version__
12
13 class DataTooLargeError(Exception):
14     pass
15
16 # storage/
17 # storage/shares/incoming
18 #   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
19 #   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
20 # storage/shares/$START/$STORAGEINDEX
21 # storage/shares/$START/$STORAGEINDEX/$SHARENUM
22
23 # Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
24 # base-32 chars).
25
26 # $SHARENUM matches this regex:
27 NUM_RE=re.compile("^[0-9]+$")
28
29 # each share file (in storage/shares/$SI/$SHNUM) contains lease information
30 # and share data. The share data is accessed by RIBucketWriter.write and
31 # RIBucketReader.read . The lease information is not accessible through these
32 # interfaces.
33
34 # The share file has the following layout:
35 #  0x00: share file version number, four bytes, current version is 1
36 #  0x04: share data length, four bytes big-endian = A # See Footnote 1 below.
37 #  0x08: number of leases, four bytes big-endian
38 #  0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
39 #  A+0x0c = B: first lease. Lease format is:
40 #   B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
41 #   B+0x04: renew secret, 32 bytes (SHA256)
42 #   B+0x24: cancel secret, 32 bytes (SHA256)
43 #   B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
44 #   B+0x48: next lease, or end of record
45
46 # Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers, but it is still
47 # filled in by storage servers in case the storage server software gets downgraded from >= Tahoe
48 # v1.3.0 to < Tahoe v1.3.0, or the share file is moved from one storage server to another.  The
49 # value stored in this field is truncated, so If the actual share data length is >= 2**32, then
50 # the value stored in this field will be the actual share data length modulo 2**32.
51
52 def si_b2a(storageindex):
53     return base32.b2a(storageindex)
54
55 def si_a2b(ascii_storageindex):
56     return base32.a2b(ascii_storageindex)
57
58 def storage_index_to_dir(storageindex):
59     sia = si_b2a(storageindex)
60     return os.path.join(sia[:2], sia)
61
62 class LeaseInfo:
63     def __init__(self, owner_num=None, renew_secret=None, cancel_secret=None,
64                  expiration_time=None, nodeid=None):
65         self.owner_num = owner_num
66         self.renew_secret = renew_secret
67         self.cancel_secret = cancel_secret
68         self.expiration_time = expiration_time
69         if nodeid is not None:
70             assert isinstance(nodeid, str)
71             assert len(nodeid) == 20
72         self.nodeid = nodeid
73
74     def from_immutable_data(self, data):
75         (self.owner_num,
76          self.renew_secret,
77          self.cancel_secret,
78          self.expiration_time) = struct.unpack(">L32s32sL", data)
79         self.nodeid = None
80         return self
81     def to_immutable_data(self):
82         return struct.pack(">L32s32sL",
83                            self.owner_num,
84                            self.renew_secret, self.cancel_secret,
85                            int(self.expiration_time))
86
87     def to_mutable_data(self):
88         return struct.pack(">LL32s32s20s",
89                            self.owner_num,
90                            int(self.expiration_time),
91                            self.renew_secret, self.cancel_secret,
92                            self.nodeid)
93     def from_mutable_data(self, data):
94         (self.owner_num,
95          self.expiration_time,
96          self.renew_secret, self.cancel_secret,
97          self.nodeid) = struct.unpack(">LL32s32s20s", data)
98         return self
99
100
101 class ShareFile:
102     LEASE_SIZE = struct.calcsize(">L32s32sL")
103
104     def __init__(self, filename, max_size=None, create=False):
105         """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
106         precondition((max_size is not None) or (not create), max_size, create)
107         self.home = filename
108         self._max_size = max_size
109         if create:
110             # touch the file, so later callers will see that we're working on it.
111             # Also construct the metadata.
112             assert not os.path.exists(self.home)
113             fileutil.make_dirs(os.path.dirname(self.home))
114             f = open(self.home, 'wb')
115             # The second field -- the four-byte share data length -- is no
116             # longer used as of Tahoe v1.3.0, but we continue to write it in
117             # there in case someone downgrades a storage server from >=
118             # Tahoe-1.3.0 to < Tahoe-1.3.0, or moves a share file from one
119             # server to another, etc. We do saturation -- a share data length
120             # larger than 2**32-1 (what can fit into the field) is marked as
121             # the largest length that can fit into the field. That way, even
122             # if this does happen, the old < v1.3.0 server will still allow
123             # clients to read the first part of the share.
124             f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0))
125             f.close()
126             self._lease_offset = max_size + 0x0c
127             self._num_leases = 0
128         else:
129             f = open(self.home, 'rb')
130             filesize = os.path.getsize(self.home)
131             (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
132             f.close()
133             assert version == 1, version
134             self._num_leases = num_leases
135             self._lease_offset = filesize - (num_leases * self.LEASE_SIZE)
136         self._data_offset = 0xc
137
138     def unlink(self):
139         os.unlink(self.home)
140
141     def read_share_data(self, offset, length):
142         precondition(offset >= 0)
143         # reads beyond the end of the data are truncated. Reads that start beyond the end of the
144         # data return an empty string.
145         # I wonder why Python doesn't do the following computation for me?
146         seekpos = self._data_offset+offset
147         fsize = os.path.getsize(self.home)
148         actuallength = max(0, min(length, fsize-seekpos))
149         if actuallength == 0:
150             return ""
151         f = open(self.home, 'rb')
152         f.seek(seekpos)
153         return f.read(actuallength)
154
155     def write_share_data(self, offset, data):
156         length = len(data)
157         precondition(offset >= 0, offset)
158         if self._max_size is not None and offset+length > self._max_size:
159             raise DataTooLargeError(self._max_size, offset, length)
160         f = open(self.home, 'rb+')
161         real_offset = self._data_offset+offset
162         f.seek(real_offset)
163         assert f.tell() == real_offset
164         f.write(data)
165         f.close()
166
167     def _write_lease_record(self, f, lease_number, lease_info):
168         offset = self._lease_offset + lease_number * self.LEASE_SIZE
169         f.seek(offset)
170         assert f.tell() == offset
171         f.write(lease_info.to_immutable_data())
172
173     def _read_num_leases(self, f):
174         f.seek(0x08)
175         (num_leases,) = struct.unpack(">L", f.read(4))
176         return num_leases
177
178     def _write_num_leases(self, f, num_leases):
179         f.seek(0x08)
180         f.write(struct.pack(">L", num_leases))
181
182     def _truncate_leases(self, f, num_leases):
183         f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
184
185     def iter_leases(self):
186         """Yields (ownernum, renew_secret, cancel_secret, expiration_time)
187         for all leases."""
188         f = open(self.home, 'rb')
189         (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
190         f.seek(self._lease_offset)
191         for i in range(num_leases):
192             data = f.read(self.LEASE_SIZE)
193             if data:
194                 yield LeaseInfo().from_immutable_data(data)
195
196     def add_lease(self, lease_info):
197         f = open(self.home, 'rb+')
198         num_leases = self._read_num_leases(f)
199         self._write_lease_record(f, num_leases, lease_info)
200         self._write_num_leases(f, num_leases+1)
201         f.close()
202
203     def renew_lease(self, renew_secret, new_expire_time):
204         for i,lease in enumerate(self.iter_leases()):
205             if lease.renew_secret == renew_secret:
206                 # yup. See if we need to update the owner time.
207                 if new_expire_time > lease.expiration_time:
208                     # yes
209                     lease.expiration_time = new_expire_time
210                     f = open(self.home, 'rb+')
211                     self._write_lease_record(f, i, lease)
212                     f.close()
213                 return
214         raise IndexError("unable to renew non-existent lease")
215
216     def add_or_renew_lease(self, lease_info):
217         try:
218             self.renew_lease(lease_info.renew_secret,
219                              lease_info.expiration_time)
220         except IndexError:
221             self.add_lease(lease_info)
222
223
224     def cancel_lease(self, cancel_secret):
225         """Remove a lease with the given cancel_secret. If the last lease is
226         cancelled, the file will be removed. Return the number of bytes that
227         were freed (by truncating the list of leases, and possibly by
228         deleting the file. Raise IndexError if there was no lease with the
229         given cancel_secret.
230         """
231
232         leases = list(self.iter_leases())
233         num_leases = len(leases)
234         num_leases_removed = 0
235         for i,lease in enumerate(leases[:]):
236             if lease.cancel_secret == cancel_secret:
237                 leases[i] = None
238                 num_leases_removed += 1
239         if not num_leases_removed:
240             raise IndexError("unable to find matching lease to cancel")
241         if num_leases_removed:
242             # pack and write out the remaining leases. We write these out in
243             # the same order as they were added, so that if we crash while
244             # doing this, we won't lose any non-cancelled leases.
245             leases = [l for l in leases if l] # remove the cancelled leases
246             f = open(self.home, 'rb+')
247             for i,lease in enumerate(leases):
248                 self._write_lease_record(f, i, lease)
249             self._write_num_leases(f, len(leases))
250             self._truncate_leases(f, len(leases))
251             f.close()
252         space_freed = self.LEASE_SIZE * num_leases_removed
253         if not len(leases):
254             space_freed += os.stat(self.home)[stat.ST_SIZE]
255             self.unlink()
256         return space_freed
257
258
259 class BucketWriter(Referenceable):
260     implements(RIBucketWriter)
261
262     def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary):
263         self.ss = ss
264         self.incominghome = incominghome
265         self.finalhome = finalhome
266         self._max_size = max_size # don't allow the client to write more than this
267         self._canary = canary
268         self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
269         self.closed = False
270         self.throw_out_all_data = False
271         self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
272         # also, add our lease to the file now, so that other ones can be
273         # added by simultaneous uploaders
274         self._sharefile.add_lease(lease_info)
275
276     def allocated_size(self):
277         return self._max_size
278
279     def remote_write(self, offset, data):
280         start = time.time()
281         precondition(not self.closed)
282         if self.throw_out_all_data:
283             return
284         self._sharefile.write_share_data(offset, data)
285         self.ss.add_latency("write", time.time() - start)
286         self.ss.count("write")
287
288     def remote_close(self):
289         precondition(not self.closed)
290         start = time.time()
291
292         fileutil.make_dirs(os.path.dirname(self.finalhome))
293         fileutil.rename(self.incominghome, self.finalhome)
294         try:
295             # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
296             # We try to delete the parent (.../ab/abcde) to avoid leaving
297             # these directories lying around forever, but the delete might
298             # fail if we're working on another share for the same storage
299             # index (like ab/abcde/5). The alternative approach would be to
300             # use a hierarchy of objects (PrefixHolder, BucketHolder,
301             # ShareWriter), each of which is responsible for a single
302             # directory on disk, and have them use reference counting of
303             # their children to know when they should do the rmdir. This
304             # approach is simpler, but relies on os.rmdir refusing to delete
305             # a non-empty directory. Do *not* use fileutil.rm_dir() here!
306             os.rmdir(os.path.dirname(self.incominghome))
307             # we also delete the grandparent (prefix) directory, .../ab ,
308             # again to avoid leaving directories lying around. This might
309             # fail if there is another bucket open that shares a prefix (like
310             # ab/abfff).
311             os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
312             # we leave the great-grandparent (incoming/) directory in place.
313         except EnvironmentError:
314             # ignore the "can't rmdir because the directory is not empty"
315             # exceptions, those are normal consequences of the
316             # above-mentioned conditions.
317             pass
318         self._sharefile = None
319         self.closed = True
320         self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
321
322         filelen = os.stat(self.finalhome)[stat.ST_SIZE]
323         self.ss.bucket_writer_closed(self, filelen)
324         self.ss.add_latency("close", time.time() - start)
325         self.ss.count("close")
326
327     def _disconnected(self):
328         if not self.closed:
329             self._abort()
330
331     def remote_abort(self):
332         log.msg("storage: aborting sharefile %s" % self.incominghome,
333                 facility="tahoe.storage", level=log.UNUSUAL)
334         if not self.closed:
335             self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
336         self._abort()
337         self.ss.count("abort")
338
339     def _abort(self):
340         if self.closed:
341             return
342         os.remove(self.incominghome)
343         # if we were the last share to be moved, remove the incoming/
344         # directory that was our parent
345         parentdir = os.path.split(self.incominghome)[0]
346         if not os.listdir(parentdir):
347             os.rmdir(parentdir)
348
349
350
351 class BucketReader(Referenceable):
352     implements(RIBucketReader)
353
354     def __init__(self, ss, sharefname, storage_index=None, shnum=None):
355         self.ss = ss
356         self._share_file = ShareFile(sharefname)
357         self.storage_index = storage_index
358         self.shnum = shnum
359
360     def __repr__(self):
361         return "<%s %s %s>" % (self.__class__.__name__, base32.b2a_l(self.storage_index[:8], 60), self.shnum)
362
363     def remote_read(self, offset, length):
364         start = time.time()
365         data = self._share_file.read_share_data(offset, length)
366         self.ss.add_latency("read", time.time() - start)
367         self.ss.count("read")
368         return data
369
370     def remote_advise_corrupt_share(self, reason):
371         return self.ss.remote_advise_corrupt_share("immutable",
372                                                    self.storage_index,
373                                                    self.shnum,
374                                                    reason)
375
376 # the MutableShareFile is like the ShareFile, but used for mutable data. It
377 # has a different layout. See docs/mutable.txt for more details.
378
379 # #   offset    size    name
380 # 1   0         32      magic verstr "tahoe mutable container v1" plus binary
381 # 2   32        20      write enabler's nodeid
382 # 3   52        32      write enabler
383 # 4   84        8       data size (actual share data present) (a)
384 # 5   92        8       offset of (8) count of extra leases (after data)
385 # 6   100       368     four leases, 92 bytes each
386 #                        0    4   ownerid (0 means "no lease here")
387 #                        4    4   expiration timestamp
388 #                        8   32   renewal token
389 #                        40  32   cancel token
390 #                        72  20   nodeid which accepted the tokens
391 # 7   468       (a)     data
392 # 8   ??        4       count of extra leases
393 # 9   ??        n*92    extra leases
394
395
396 assert struct.calcsize("L"), 4 # The struct module doc says that L's are 4 bytes in size.
397 assert struct.calcsize("Q"), 8 # The struct module doc says that Q's are 8 bytes in size (at least with big-endian ordering).
398
399 class MutableShareFile:
400
401     DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
402     EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
403     HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
404     LEASE_SIZE = struct.calcsize(">LL32s32s20s")
405     assert LEASE_SIZE == 92
406     DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
407     assert DATA_OFFSET == 468, DATA_OFFSET
408     # our sharefiles share with a recognizable string, plus some random
409     # binary data to reduce the chance that a regular text file will look
410     # like a sharefile.
411     MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
412     assert len(MAGIC) == 32
413     MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
414     # TODO: decide upon a policy for max share size
415
416     def __init__(self, filename, parent=None):
417         self.home = filename
418         if os.path.exists(self.home):
419             # we don't cache anything, just check the magic
420             f = open(self.home, 'rb')
421             data = f.read(self.HEADER_SIZE)
422             (magic,
423              write_enabler_nodeid, write_enabler,
424              data_length, extra_least_offset) = \
425              struct.unpack(">32s20s32sQQ", data)
426             assert magic == self.MAGIC
427         self.parent = parent # for logging
428
429     def log(self, *args, **kwargs):
430         return self.parent.log(*args, **kwargs)
431
432     def create(self, my_nodeid, write_enabler):
433         assert not os.path.exists(self.home)
434         data_length = 0
435         extra_lease_offset = (self.HEADER_SIZE
436                               + 4 * self.LEASE_SIZE
437                               + data_length)
438         assert extra_lease_offset == self.DATA_OFFSET # true at creation
439         num_extra_leases = 0
440         f = open(self.home, 'wb')
441         header = struct.pack(">32s20s32sQQ",
442                              self.MAGIC, my_nodeid, write_enabler,
443                              data_length, extra_lease_offset,
444                              )
445         leases = ("\x00"*self.LEASE_SIZE) * 4
446         f.write(header + leases)
447         # data goes here, empty after creation
448         f.write(struct.pack(">L", num_extra_leases))
449         # extra leases go here, none at creation
450         f.close()
451
452     def unlink(self):
453         os.unlink(self.home)
454
455     def _read_data_length(self, f):
456         f.seek(self.DATA_LENGTH_OFFSET)
457         (data_length,) = struct.unpack(">Q", f.read(8))
458         return data_length
459
460     def _write_data_length(self, f, data_length):
461         f.seek(self.DATA_LENGTH_OFFSET)
462         f.write(struct.pack(">Q", data_length))
463
464     def _read_share_data(self, f, offset, length):
465         precondition(offset >= 0)
466         data_length = self._read_data_length(f)
467         if offset+length > data_length:
468             # reads beyond the end of the data are truncated. Reads that
469             # start beyond the end of the data return an empty string.
470             length = max(0, data_length-offset)
471         if length == 0:
472             return ""
473         precondition(offset+length <= data_length)
474         f.seek(self.DATA_OFFSET+offset)
475         data = f.read(length)
476         return data
477
478     def _read_extra_lease_offset(self, f):
479         f.seek(self.EXTRA_LEASE_OFFSET)
480         (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
481         return extra_lease_offset
482
483     def _write_extra_lease_offset(self, f, offset):
484         f.seek(self.EXTRA_LEASE_OFFSET)
485         f.write(struct.pack(">Q", offset))
486
487     def _read_num_extra_leases(self, f):
488         offset = self._read_extra_lease_offset(f)
489         f.seek(offset)
490         (num_extra_leases,) = struct.unpack(">L", f.read(4))
491         return num_extra_leases
492
493     def _write_num_extra_leases(self, f, num_leases):
494         extra_lease_offset = self._read_extra_lease_offset(f)
495         f.seek(extra_lease_offset)
496         f.write(struct.pack(">L", num_leases))
497
498     def _change_container_size(self, f, new_container_size):
499         if new_container_size > self.MAX_SIZE:
500             raise DataTooLargeError()
501         old_extra_lease_offset = self._read_extra_lease_offset(f)
502         new_extra_lease_offset = self.DATA_OFFSET + new_container_size
503         if new_extra_lease_offset < old_extra_lease_offset:
504             # TODO: allow containers to shrink. For now they remain large.
505             return
506         num_extra_leases = self._read_num_extra_leases(f)
507         f.seek(old_extra_lease_offset)
508         extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
509         f.seek(new_extra_lease_offset)
510         f.write(extra_lease_data)
511         # an interrupt here will corrupt the leases, iff the move caused the
512         # extra leases to overlap.
513         self._write_extra_lease_offset(f, new_extra_lease_offset)
514
515     def _write_share_data(self, f, offset, data):
516         length = len(data)
517         precondition(offset >= 0)
518         data_length = self._read_data_length(f)
519         extra_lease_offset = self._read_extra_lease_offset(f)
520
521         if offset+length >= data_length:
522             # They are expanding their data size.
523             if self.DATA_OFFSET+offset+length > extra_lease_offset:
524                 # Their new data won't fit in the current container, so we
525                 # have to move the leases. With luck, they're expanding it
526                 # more than the size of the extra lease block, which will
527                 # minimize the corrupt-the-share window
528                 self._change_container_size(f, offset+length)
529                 extra_lease_offset = self._read_extra_lease_offset(f)
530
531                 # an interrupt here is ok.. the container has been enlarged
532                 # but the data remains untouched
533
534             assert self.DATA_OFFSET+offset+length <= extra_lease_offset
535             # Their data now fits in the current container. We must write
536             # their new data and modify the recorded data size.
537             new_data_length = offset+length
538             self._write_data_length(f, new_data_length)
539             # an interrupt here will result in a corrupted share
540
541         # now all that's left to do is write out their data
542         f.seek(self.DATA_OFFSET+offset)
543         f.write(data)
544         return
545
546     def _write_lease_record(self, f, lease_number, lease_info):
547         extra_lease_offset = self._read_extra_lease_offset(f)
548         num_extra_leases = self._read_num_extra_leases(f)
549         if lease_number < 4:
550             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
551         elif (lease_number-4) < num_extra_leases:
552             offset = (extra_lease_offset
553                       + 4
554                       + (lease_number-4)*self.LEASE_SIZE)
555         else:
556             # must add an extra lease record
557             self._write_num_extra_leases(f, num_extra_leases+1)
558             offset = (extra_lease_offset
559                       + 4
560                       + (lease_number-4)*self.LEASE_SIZE)
561         f.seek(offset)
562         assert f.tell() == offset
563         f.write(lease_info.to_mutable_data())
564
565     def _read_lease_record(self, f, lease_number):
566         # returns a LeaseInfo instance, or None
567         extra_lease_offset = self._read_extra_lease_offset(f)
568         num_extra_leases = self._read_num_extra_leases(f)
569         if lease_number < 4:
570             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
571         elif (lease_number-4) < num_extra_leases:
572             offset = (extra_lease_offset
573                       + 4
574                       + (lease_number-4)*self.LEASE_SIZE)
575         else:
576             raise IndexError("No such lease number %d" % lease_number)
577         f.seek(offset)
578         assert f.tell() == offset
579         data = f.read(self.LEASE_SIZE)
580         lease_info = LeaseInfo().from_mutable_data(data)
581         if lease_info.owner_num == 0:
582             return None
583         return lease_info
584
585     def _get_num_lease_slots(self, f):
586         # how many places do we have allocated for leases? Not all of them
587         # are filled.
588         num_extra_leases = self._read_num_extra_leases(f)
589         return 4+num_extra_leases
590
591     def _get_first_empty_lease_slot(self, f):
592         # return an int with the index of an empty slot, or None if we do not
593         # currently have an empty slot
594
595         for i in range(self._get_num_lease_slots(f)):
596             if self._read_lease_record(f, i) is None:
597                 return i
598         return None
599
600     def _enumerate_leases(self, f):
601         """Yields (leasenum, (ownerid, expiration_time, renew_secret,
602         cancel_secret, accepting_nodeid)) for all leases."""
603         for i in range(self._get_num_lease_slots(f)):
604             try:
605                 data = self._read_lease_record(f, i)
606                 if data is not None:
607                     yield (i,data)
608             except IndexError:
609                 return
610
611     def debug_get_leases(self):
612         f = open(self.home, 'rb')
613         leases = list(self._enumerate_leases(f))
614         f.close()
615         return leases
616
617     def add_lease(self, lease_info):
618         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
619         f = open(self.home, 'rb+')
620         num_lease_slots = self._get_num_lease_slots(f)
621         empty_slot = self._get_first_empty_lease_slot(f)
622         if empty_slot is not None:
623             self._write_lease_record(f, empty_slot, lease_info)
624         else:
625             self._write_lease_record(f, num_lease_slots, lease_info)
626         f.close()
627
628     def renew_lease(self, renew_secret, new_expire_time):
629         accepting_nodeids = set()
630         f = open(self.home, 'rb+')
631         for (leasenum,lease) in self._enumerate_leases(f):
632             if lease.renew_secret == renew_secret:
633                 # yup. See if we need to update the owner time.
634                 if new_expire_time > lease.expiration_time:
635                     # yes
636                     lease.expiration_time = new_expire_time
637                     self._write_lease_record(f, leasenum, lease)
638                 f.close()
639                 return
640             accepting_nodeids.add(lease.nodeid)
641         f.close()
642         # Return the accepting_nodeids set, to give the client a chance to
643         # update the leases on a share which has been migrated from its
644         # original server to a new one.
645         msg = ("Unable to renew non-existent lease. I have leases accepted by"
646                " nodeids: ")
647         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
648                          for anid in accepting_nodeids])
649         msg += " ."
650         raise IndexError(msg)
651
652     def add_or_renew_lease(self, lease_info):
653         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
654         try:
655             self.renew_lease(lease_info.renew_secret,
656                              lease_info.expiration_time)
657         except IndexError:
658             self.add_lease(lease_info)
659
660     def cancel_lease(self, cancel_secret):
661         """Remove any leases with the given cancel_secret. If the last lease
662         is cancelled, the file will be removed. Return the number of bytes
663         that were freed (by truncating the list of leases, and possibly by
664         deleting the file. Raise IndexError if there was no lease with the
665         given cancel_secret."""
666
667         accepting_nodeids = set()
668         modified = 0
669         remaining = 0
670         blank_lease = LeaseInfo(owner_num=0,
671                                 renew_secret="\x00"*32,
672                                 cancel_secret="\x00"*32,
673                                 expiration_time=0,
674                                 nodeid="\x00"*20)
675         f = open(self.home, 'rb+')
676         for (leasenum,lease) in self._enumerate_leases(f):
677             accepting_nodeids.add(lease.nodeid)
678             if lease.cancel_secret == cancel_secret:
679                 self._write_lease_record(f, leasenum, blank_lease)
680                 modified += 1
681             else:
682                 remaining += 1
683         if modified:
684             freed_space = self._pack_leases(f)
685             f.close()
686             if not remaining:
687                 freed_space += os.stat(self.home)[stat.ST_SIZE]
688                 self.unlink()
689             return freed_space
690
691         msg = ("Unable to cancel non-existent lease. I have leases "
692                "accepted by nodeids: ")
693         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
694                          for anid in accepting_nodeids])
695         msg += " ."
696         raise IndexError(msg)
697
698     def _pack_leases(self, f):
699         # TODO: reclaim space from cancelled leases
700         return 0
701
702     def _read_write_enabler_and_nodeid(self, f):
703         f.seek(0)
704         data = f.read(self.HEADER_SIZE)
705         (magic,
706          write_enabler_nodeid, write_enabler,
707          data_length, extra_least_offset) = \
708          struct.unpack(">32s20s32sQQ", data)
709         assert magic == self.MAGIC
710         return (write_enabler, write_enabler_nodeid)
711
712     def readv(self, readv):
713         datav = []
714         f = open(self.home, 'rb')
715         for (offset, length) in readv:
716             datav.append(self._read_share_data(f, offset, length))
717         f.close()
718         return datav
719
720 #    def remote_get_length(self):
721 #        f = open(self.home, 'rb')
722 #        data_length = self._read_data_length(f)
723 #        f.close()
724 #        return data_length
725
726     def check_write_enabler(self, write_enabler, si_s):
727         f = open(self.home, 'rb+')
728         (real_write_enabler, write_enabler_nodeid) = \
729                              self._read_write_enabler_and_nodeid(f)
730         f.close()
731         if write_enabler != real_write_enabler:
732             # accomodate share migration by reporting the nodeid used for the
733             # old write enabler.
734             self.log(format="bad write enabler on SI %(si)s,"
735                      " recorded by nodeid %(nodeid)s",
736                      facility="tahoe.storage",
737                      level=log.WEIRD, umid="cE1eBQ",
738                      si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
739             msg = "The write enabler was recorded by nodeid '%s'." % \
740                   (idlib.nodeid_b2a(write_enabler_nodeid),)
741             raise BadWriteEnablerError(msg)
742
743     def check_testv(self, testv):
744         test_good = True
745         f = open(self.home, 'rb+')
746         for (offset, length, operator, specimen) in testv:
747             data = self._read_share_data(f, offset, length)
748             if not testv_compare(data, operator, specimen):
749                 test_good = False
750                 break
751         f.close()
752         return test_good
753
754     def writev(self, datav, new_length):
755         f = open(self.home, 'rb+')
756         for (offset, data) in datav:
757             self._write_share_data(f, offset, data)
758         if new_length is not None:
759             self._change_container_size(f, new_length)
760             f.seek(self.DATA_LENGTH_OFFSET)
761             f.write(struct.pack(">Q", new_length))
762         f.close()
763
764 def testv_compare(a, op, b):
765     assert op in ("lt", "le", "eq", "ne", "ge", "gt")
766     if op == "lt":
767         return a < b
768     if op == "le":
769         return a <= b
770     if op == "eq":
771         return a == b
772     if op == "ne":
773         return a != b
774     if op == "ge":
775         return a >= b
776     if op == "gt":
777         return a > b
778     # never reached
779
780 class EmptyShare:
781
782     def check_testv(self, testv):
783         test_good = True
784         for (offset, length, operator, specimen) in testv:
785             data = ""
786             if not testv_compare(data, operator, specimen):
787                 test_good = False
788                 break
789         return test_good
790
791 def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
792     ms = MutableShareFile(filename, parent)
793     ms.create(my_nodeid, write_enabler)
794     del ms
795     return MutableShareFile(filename, parent)
796
797
798 class StorageServer(service.MultiService, Referenceable):
799     implements(RIStorageServer, IStatsProducer)
800     name = 'storage'
801
802     def __init__(self, storedir, reserved_space=0,
803                  discard_storage=False, readonly_storage=False,
804                  stats_provider=None):
805         service.MultiService.__init__(self)
806         self.storedir = storedir
807         sharedir = os.path.join(storedir, "shares")
808         fileutil.make_dirs(sharedir)
809         self.sharedir = sharedir
810         # we don't actually create the corruption-advisory dir until necessary
811         self.corruption_advisory_dir = os.path.join(storedir,
812                                                     "corruption-advisories")
813         self.reserved_space = int(reserved_space)
814         self.no_storage = discard_storage
815         self.readonly_storage = readonly_storage
816         self.stats_provider = stats_provider
817         if self.stats_provider:
818             self.stats_provider.register_producer(self)
819         self.incomingdir = os.path.join(sharedir, 'incoming')
820         self._clean_incomplete()
821         fileutil.make_dirs(self.incomingdir)
822         self._active_writers = weakref.WeakKeyDictionary()
823         lp = log.msg("StorageServer created", facility="tahoe.storage")
824
825         if reserved_space:
826             if self.get_available_space() is None:
827                 log.msg("warning: [storage]reserved_space= is set, but this platform does not support statvfs(2), so this reservation cannot be honored",
828                         umin="0wZ27w", level=log.UNUSUAL)
829
830         self.latencies = {"allocate": [], # immutable
831                           "write": [],
832                           "close": [],
833                           "read": [],
834                           "get": [],
835                           "writev": [], # mutable
836                           "readv": [],
837                           "add-lease": [], # both
838                           "renew": [],
839                           "cancel": [],
840                           }
841
842     def count(self, name, delta=1):
843         if self.stats_provider:
844             self.stats_provider.count("storage_server." + name, delta)
845
846     def add_latency(self, category, latency):
847         a = self.latencies[category]
848         a.append(latency)
849         if len(a) > 1000:
850             self.latencies[category] = a[-1000:]
851
852     def get_latencies(self):
853         """Return a dict, indexed by category, that contains a dict of
854         latency numbers for each category. Each dict will contain the
855         following keys: mean, 01_0_percentile, 10_0_percentile,
856         50_0_percentile (median), 90_0_percentile, 95_0_percentile,
857         99_0_percentile, 99_9_percentile. If no samples have been collected
858         for the given category, then that category name will not be present
859         in the return value."""
860         # note that Amazon's Dynamo paper says they use 99.9% percentile.
861         output = {}
862         for category in self.latencies:
863             if not self.latencies[category]:
864                 continue
865             stats = {}
866             samples = self.latencies[category][:]
867             samples.sort()
868             count = len(samples)
869             stats["mean"] = sum(samples) / count
870             stats["01_0_percentile"] = samples[int(0.01 * count)]
871             stats["10_0_percentile"] = samples[int(0.1 * count)]
872             stats["50_0_percentile"] = samples[int(0.5 * count)]
873             stats["90_0_percentile"] = samples[int(0.9 * count)]
874             stats["95_0_percentile"] = samples[int(0.95 * count)]
875             stats["99_0_percentile"] = samples[int(0.99 * count)]
876             stats["99_9_percentile"] = samples[int(0.999 * count)]
877             output[category] = stats
878         return output
879
880     def log(self, *args, **kwargs):
881         if "facility" not in kwargs:
882             kwargs["facility"] = "tahoe.storage"
883         return log.msg(*args, **kwargs)
884
885     def setNodeID(self, nodeid):
886         # somebody must set this before any slots can be created or leases
887         # added
888         self.my_nodeid = nodeid
889
890     def startService(self):
891         service.MultiService.startService(self)
892         if self.parent:
893             nodeid = self.parent.nodeid # 20 bytes, binary
894             assert len(nodeid) == 20
895             self.setNodeID(nodeid)
896
897     def _clean_incomplete(self):
898         fileutil.rm_dir(self.incomingdir)
899
900     def get_stats(self):
901         # remember: RIStatsProvider requires that our return dict
902         # contains numeric values.
903         stats = { 'storage_server.allocated': self.allocated_size(), }
904         for category,ld in self.get_latencies().items():
905             for name,v in ld.items():
906                 stats['storage_server.latencies.%s.%s' % (category, name)] = v
907         writeable = True
908         if self.readonly_storage:
909             writeable = False
910         try:
911             s = os.statvfs(self.storedir)
912             disk_total = s.f_bsize * s.f_blocks
913             disk_used = s.f_bsize * (s.f_blocks - s.f_bfree)
914             # spacetime predictors should look at the slope of disk_used.
915             disk_avail = s.f_bsize * s.f_bavail # available to non-root users
916             # include our local policy here: if we stop accepting shares when
917             # the available space drops below 1GB, then include that fact in
918             # disk_avail.
919             disk_avail -= self.reserved_space
920             disk_avail = max(disk_avail, 0)
921             if self.readonly_storage:
922                 disk_avail = 0
923             if disk_avail == 0:
924                 writeable = False
925
926             # spacetime predictors should use disk_avail / (d(disk_used)/dt)
927             stats["storage_server.disk_total"] = disk_total
928             stats["storage_server.disk_used"] = disk_used
929             stats["storage_server.disk_avail"] = disk_avail
930         except AttributeError:
931             # os.statvfs is available only on unix
932             pass
933         stats["storage_server.accepting_immutable_shares"] = int(writeable)
934         return stats
935
936
937     def stat_disk(self, d):
938         s = os.statvfs(d)
939         # s.f_bavail: available to non-root users
940         disk_avail = s.f_bsize * s.f_bavail
941         return disk_avail
942
943     def get_available_space(self):
944         # returns None if it cannot be measured (windows)
945         try:
946             disk_avail = self.stat_disk(self.storedir)
947             disk_avail -= self.reserved_space
948         except AttributeError:
949             disk_avail = None
950         if self.readonly_storage:
951             disk_avail = 0
952         return disk_avail
953
954     def allocated_size(self):
955         space = 0
956         for bw in self._active_writers:
957             space += bw.allocated_size()
958         return space
959
960     def remote_get_version(self):
961         remaining_space = self.get_available_space()
962         if remaining_space is None:
963             # we're on a platform that doesn't have 'df', so make a vague
964             # guess.
965             remaining_space = 2**64
966         version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
967                     { "maximum-immutable-share-size": remaining_space,
968                       "tolerates-immutable-read-overrun": True,
969                       "delete-mutable-shares-with-zero-length-writev": True,
970                       },
971                     "application-version": str(allmydata.__version__),
972                     }
973         return version
974
975     def remote_allocate_buckets(self, storage_index,
976                                 renew_secret, cancel_secret,
977                                 sharenums, allocated_size,
978                                 canary, owner_num=0):
979         # owner_num is not for clients to set, but rather it should be
980         # curried into the PersonalStorageServer instance that is dedicated
981         # to a particular owner.
982         start = time.time()
983         self.count("allocate")
984         alreadygot = set()
985         bucketwriters = {} # k: shnum, v: BucketWriter
986         si_dir = storage_index_to_dir(storage_index)
987         si_s = si_b2a(storage_index)
988
989         log.msg("storage: allocate_buckets %s" % si_s)
990
991         # in this implementation, the lease information (including secrets)
992         # goes into the share files themselves. It could also be put into a
993         # separate database. Note that the lease should not be added until
994         # the BucketWriter has been closed.
995         expire_time = time.time() + 31*24*60*60
996         lease_info = LeaseInfo(owner_num,
997                                renew_secret, cancel_secret,
998                                expire_time, self.my_nodeid)
999
1000         max_space_per_bucket = allocated_size
1001
1002         remaining_space = self.get_available_space()
1003         limited = remaining_space is not None
1004         if limited:
1005             # this is a bit conservative, since some of this allocated_size()
1006             # has already been written to disk, where it will show up in
1007             # get_available_space.
1008             remaining_space -= self.allocated_size()
1009
1010         # fill alreadygot with all shares that we have, not just the ones
1011         # they asked about: this will save them a lot of work. Add or update
1012         # leases for all of them: if they want us to hold shares for this
1013         # file, they'll want us to hold leases for this file.
1014         for (shnum, fn) in self._get_bucket_shares(storage_index):
1015             alreadygot.add(shnum)
1016             sf = ShareFile(fn)
1017             sf.add_or_renew_lease(lease_info)
1018
1019         # self.readonly_storage causes remaining_space=0
1020
1021         for shnum in sharenums:
1022             incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
1023             finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
1024             if os.path.exists(finalhome):
1025                 # great! we already have it. easy.
1026                 pass
1027             elif os.path.exists(incominghome):
1028                 # Note that we don't create BucketWriters for shnums that
1029                 # have a partial share (in incoming/), so if a second upload
1030                 # occurs while the first is still in progress, the second
1031                 # uploader will use different storage servers.
1032                 pass
1033             elif (not limited) or (remaining_space >= max_space_per_bucket):
1034                 # ok! we need to create the new share file.
1035                 bw = BucketWriter(self, incominghome, finalhome,
1036                                   max_space_per_bucket, lease_info, canary)
1037                 if self.no_storage:
1038                     bw.throw_out_all_data = True
1039                 bucketwriters[shnum] = bw
1040                 self._active_writers[bw] = 1
1041                 if limited:
1042                     remaining_space -= max_space_per_bucket
1043             else:
1044                 # bummer! not enough space to accept this bucket
1045                 pass
1046
1047         if bucketwriters:
1048             fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
1049
1050         self.add_latency("allocate", time.time() - start)
1051         return alreadygot, bucketwriters
1052
1053     def _iter_share_files(self, storage_index):
1054         for shnum, filename in self._get_bucket_shares(storage_index):
1055             f = open(filename, 'rb')
1056             header = f.read(32)
1057             f.close()
1058             if header[:32] == MutableShareFile.MAGIC:
1059                 sf = MutableShareFile(filename, self)
1060                 # note: if the share has been migrated, the renew_lease()
1061                 # call will throw an exception, with information to help the
1062                 # client update the lease.
1063             elif header[:4] == struct.pack(">L", 1):
1064                 sf = ShareFile(filename)
1065             else:
1066                 continue # non-sharefile
1067             yield sf
1068
1069     def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
1070                          owner_num=1):
1071         start = time.time()
1072         self.count("add-lease")
1073         new_expire_time = time.time() + 31*24*60*60
1074         lease_info = LeaseInfo(owner_num,
1075                                renew_secret, cancel_secret,
1076                                new_expire_time, self.my_nodeid)
1077         found_buckets = False
1078         for sf in self._iter_share_files(storage_index):
1079             found_buckets = True
1080             # note: if the share has been migrated, the renew_lease()
1081             # call will throw an exception, with information to help the
1082             # client update the lease.
1083             sf.add_or_renew_lease(lease_info)
1084         self.add_latency("add-lease", time.time() - start)
1085         if not found_buckets:
1086             raise IndexError("no such storage index to do add-lease")
1087
1088
1089     def remote_renew_lease(self, storage_index, renew_secret):
1090         start = time.time()
1091         self.count("renew")
1092         new_expire_time = time.time() + 31*24*60*60
1093         found_buckets = False
1094         for sf in self._iter_share_files(storage_index):
1095             found_buckets = True
1096             sf.renew_lease(renew_secret, new_expire_time)
1097         self.add_latency("renew", time.time() - start)
1098         if not found_buckets:
1099             raise IndexError("no such lease to renew")
1100
1101     def remote_cancel_lease(self, storage_index, cancel_secret):
1102         start = time.time()
1103         self.count("cancel")
1104
1105         total_space_freed = 0
1106         found_buckets = False
1107         for sf in self._iter_share_files(storage_index):
1108             # note: if we can't find a lease on one share, we won't bother
1109             # looking in the others. Unless something broke internally
1110             # (perhaps we ran out of disk space while adding a lease), the
1111             # leases on all shares will be identical.
1112             found_buckets = True
1113             # this raises IndexError if the lease wasn't present XXXX
1114             total_space_freed += sf.cancel_lease(cancel_secret)
1115
1116         if found_buckets:
1117             storagedir = os.path.join(self.sharedir,
1118                                       storage_index_to_dir(storage_index))
1119             if not os.listdir(storagedir):
1120                 os.rmdir(storagedir)
1121
1122         if self.stats_provider:
1123             self.stats_provider.count('storage_server.bytes_freed',
1124                                       total_space_freed)
1125         self.add_latency("cancel", time.time() - start)
1126         if not found_buckets:
1127             raise IndexError("no such storage index")
1128
1129     def bucket_writer_closed(self, bw, consumed_size):
1130         if self.stats_provider:
1131             self.stats_provider.count('storage_server.bytes_added', consumed_size)
1132         del self._active_writers[bw]
1133
1134     def _get_bucket_shares(self, storage_index):
1135         """Return a list of (shnum, pathname) tuples for files that hold
1136         shares for this storage_index. In each tuple, 'shnum' will always be
1137         the integer form of the last component of 'pathname'."""
1138         storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
1139         try:
1140             for f in os.listdir(storagedir):
1141                 if NUM_RE.match(f):
1142                     filename = os.path.join(storagedir, f)
1143                     yield (int(f), filename)
1144         except OSError:
1145             # Commonly caused by there being no buckets at all.
1146             pass
1147
1148     def remote_get_buckets(self, storage_index):
1149         start = time.time()
1150         self.count("get")
1151         si_s = si_b2a(storage_index)
1152         log.msg("storage: get_buckets %s" % si_s)
1153         bucketreaders = {} # k: sharenum, v: BucketReader
1154         for shnum, filename in self._get_bucket_shares(storage_index):
1155             bucketreaders[shnum] = BucketReader(self, filename,
1156                                                 storage_index, shnum)
1157         self.add_latency("get", time.time() - start)
1158         return bucketreaders
1159
1160     def get_leases(self, storage_index):
1161         """Provide an iterator that yields all of the leases attached to this
1162         bucket. Each lease is returned as a tuple of (owner_num,
1163         renew_secret, cancel_secret, expiration_time).
1164
1165         This method is not for client use.
1166         """
1167
1168         # since all shares get the same lease data, we just grab the leases
1169         # from the first share
1170         try:
1171             shnum, filename = self._get_bucket_shares(storage_index).next()
1172             sf = ShareFile(filename)
1173             return sf.iter_leases()
1174         except StopIteration:
1175             return iter([])
1176
1177     def remote_slot_testv_and_readv_and_writev(self, storage_index,
1178                                                secrets,
1179                                                test_and_write_vectors,
1180                                                read_vector):
1181         start = time.time()
1182         self.count("writev")
1183         si_s = si_b2a(storage_index)
1184         lp = log.msg("storage: slot_writev %s" % si_s)
1185         si_dir = storage_index_to_dir(storage_index)
1186         (write_enabler, renew_secret, cancel_secret) = secrets
1187         # shares exist if there is a file for them
1188         bucketdir = os.path.join(self.sharedir, si_dir)
1189         shares = {}
1190         if os.path.isdir(bucketdir):
1191             for sharenum_s in os.listdir(bucketdir):
1192                 try:
1193                     sharenum = int(sharenum_s)
1194                 except ValueError:
1195                     continue
1196                 filename = os.path.join(bucketdir, sharenum_s)
1197                 msf = MutableShareFile(filename, self)
1198                 msf.check_write_enabler(write_enabler, si_s)
1199                 shares[sharenum] = msf
1200         # write_enabler is good for all existing shares.
1201
1202         # Now evaluate test vectors.
1203         testv_is_good = True
1204         for sharenum in test_and_write_vectors:
1205             (testv, datav, new_length) = test_and_write_vectors[sharenum]
1206             if sharenum in shares:
1207                 if not shares[sharenum].check_testv(testv):
1208                     self.log("testv failed: [%d]: %r" % (sharenum, testv))
1209                     testv_is_good = False
1210                     break
1211             else:
1212                 # compare the vectors against an empty share, in which all
1213                 # reads return empty strings.
1214                 if not EmptyShare().check_testv(testv):
1215                     self.log("testv failed (empty): [%d] %r" % (sharenum,
1216                                                                 testv))
1217                     testv_is_good = False
1218                     break
1219
1220         # now gather the read vectors, before we do any writes
1221         read_data = {}
1222         for sharenum, share in shares.items():
1223             read_data[sharenum] = share.readv(read_vector)
1224
1225         ownerid = 1 # TODO
1226         expire_time = time.time() + 31*24*60*60   # one month
1227         lease_info = LeaseInfo(ownerid,
1228                                renew_secret, cancel_secret,
1229                                expire_time, self.my_nodeid)
1230
1231         if testv_is_good:
1232             # now apply the write vectors
1233             for sharenum in test_and_write_vectors:
1234                 (testv, datav, new_length) = test_and_write_vectors[sharenum]
1235                 if new_length == 0:
1236                     if sharenum in shares:
1237                         shares[sharenum].unlink()
1238                 else:
1239                     if sharenum not in shares:
1240                         # allocate a new share
1241                         allocated_size = 2000 # arbitrary, really
1242                         share = self._allocate_slot_share(bucketdir, secrets,
1243                                                           sharenum,
1244                                                           allocated_size,
1245                                                           owner_num=0)
1246                         shares[sharenum] = share
1247                     shares[sharenum].writev(datav, new_length)
1248                     # and update the lease
1249                     shares[sharenum].add_or_renew_lease(lease_info)
1250
1251             if new_length == 0:
1252                 # delete empty bucket directories
1253                 if not os.listdir(bucketdir):
1254                     os.rmdir(bucketdir)
1255
1256
1257         # all done
1258         self.add_latency("writev", time.time() - start)
1259         return (testv_is_good, read_data)
1260
1261     def _allocate_slot_share(self, bucketdir, secrets, sharenum,
1262                              allocated_size, owner_num=0):
1263         (write_enabler, renew_secret, cancel_secret) = secrets
1264         my_nodeid = self.my_nodeid
1265         fileutil.make_dirs(bucketdir)
1266         filename = os.path.join(bucketdir, "%d" % sharenum)
1267         share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
1268                                          self)
1269         return share
1270
1271     def remote_slot_readv(self, storage_index, shares, readv):
1272         start = time.time()
1273         self.count("readv")
1274         si_s = si_b2a(storage_index)
1275         lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
1276                      facility="tahoe.storage", level=log.OPERATIONAL)
1277         si_dir = storage_index_to_dir(storage_index)
1278         # shares exist if there is a file for them
1279         bucketdir = os.path.join(self.sharedir, si_dir)
1280         if not os.path.isdir(bucketdir):
1281             self.add_latency("readv", time.time() - start)
1282             return {}
1283         datavs = {}
1284         for sharenum_s in os.listdir(bucketdir):
1285             try:
1286                 sharenum = int(sharenum_s)
1287             except ValueError:
1288                 continue
1289             if sharenum in shares or not shares:
1290                 filename = os.path.join(bucketdir, sharenum_s)
1291                 msf = MutableShareFile(filename, self)
1292                 datavs[sharenum] = msf.readv(readv)
1293         log.msg("returning shares %s" % (datavs.keys(),),
1294                 facility="tahoe.storage", level=log.NOISY, parent=lp)
1295         self.add_latency("readv", time.time() - start)
1296         return datavs
1297
1298     def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
1299                                     reason):
1300         fileutil.make_dirs(self.corruption_advisory_dir)
1301         now = time_format.iso_utc(sep="T")
1302         si_s = base32.b2a(storage_index)
1303         # windows can't handle colons in the filename
1304         fn = os.path.join(self.corruption_advisory_dir,
1305                           "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
1306         f = open(fn, "w")
1307         f.write("report: Share Corruption\n")
1308         f.write("type: %s\n" % share_type)
1309         f.write("storage_index: %s\n" % si_s)
1310         f.write("share_number: %d\n" % shnum)
1311         f.write("\n")
1312         f.write(reason)
1313         f.write("\n")
1314         f.close()
1315         log.msg(format=("client claims corruption in (%(share_type)s) " +
1316                         "%(si)s-%(shnum)d: %(reason)s"),
1317                 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
1318                 level=log.SCARY, umid="SGx2fA")
1319         return None