break storage.py into smaller pieces in storage/*.py . No behavioral changes.
authorBrian Warner <warner@lothar.com>
Wed, 18 Feb 2009 21:46:55 +0000 (14:46 -0700)
committerBrian Warner <warner@lothar.com>
Wed, 18 Feb 2009 21:46:55 +0000 (14:46 -0700)
26 files changed:
src/allmydata/client.py
src/allmydata/immutable/encode.py
src/allmydata/immutable/layout.py
src/allmydata/immutable/offloaded.py
src/allmydata/immutable/repairer.py
src/allmydata/immutable/upload.py
src/allmydata/mutable/publish.py
src/allmydata/mutable/retrieve.py
src/allmydata/mutable/servermap.py
src/allmydata/scripts/debug.py
src/allmydata/storage.py [deleted file]
src/allmydata/storage/__init__.py [new file with mode: 0644]
src/allmydata/storage/common.py [new file with mode: 0644]
src/allmydata/storage/immutable.py [new file with mode: 0644]
src/allmydata/storage/lease.py [new file with mode: 0644]
src/allmydata/storage/mutable.py [new file with mode: 0644]
src/allmydata/storage/server.py [new file with mode: 0644]
src/allmydata/test/common.py
src/allmydata/test/no_network.py
src/allmydata/test/test_download.py
src/allmydata/test/test_helper.py
src/allmydata/test/test_mutable.py
src/allmydata/test/test_storage.py
src/allmydata/test/test_system.py
src/allmydata/test/test_web.py
src/allmydata/uri.py

index d261a3b1b242fe0cb0c77ce6d8ecd8747311f520..085b29179ebe6ef174d424d824604bcef6133533 100644 (file)
@@ -10,7 +10,7 @@ from foolscap.logging import log
 from pycryptopp.publickey import rsa
 
 import allmydata
-from allmydata.storage import StorageServer
+from allmydata.storage.server import StorageServer
 from allmydata.immutable.upload import Uploader
 from allmydata.immutable.download import Downloader
 from allmydata.immutable.filenode import FileNode, LiteralFileNode
index e13ea7cd29145fc033647aa4d7ac6f20bf572bad..e68e7b2d27aae5f3e6a7a665cac53f4bfaa13055 100644 (file)
@@ -4,7 +4,8 @@ import time
 from zope.interface import implements
 from twisted.internet import defer
 from foolscap import eventual
-from allmydata import storage, uri
+from allmydata import uri
+from allmydata.storage.server import si_b2a
 from allmydata.hashtree import HashTree
 from allmydata.util import mathutil, hashutil, base32, log
 from allmydata.util.assertutil import _assert, precondition
@@ -87,7 +88,7 @@ class Encoder(object):
 
     def __repr__(self):
         if hasattr(self, "_storage_index"):
-            return "<Encoder for %s>" % storage.si_b2a(self._storage_index)[:5]
+            return "<Encoder for %s>" % si_b2a(self._storage_index)[:5]
         return "<Encoder for unknown storage index>"
 
     def log(self, *args, **kwargs):
index 1a0bc37573e1070eb0f966b8ff047f1039c4bb6e..d1e3ef0f2a3e8fe037371698918a1924734e12b3 100644 (file)
@@ -5,7 +5,7 @@ from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
      FileTooLargeError, HASH_SIZE
 from allmydata.util import mathutil, idlib, observer
 from allmydata.util.assertutil import precondition
-from allmydata import storage
+from allmydata.storage.server import si_b2a
 
 class LayoutInvalid(Exception):
     """ There is something wrong with these bytes so they can't be interpreted as the kind of
@@ -274,7 +274,7 @@ class ReadBucketProxy:
         self._rref = rref
         self._peerid = peerid
         peer_id_s = idlib.shortnodeid_b2a(peerid)
-        storage_index_s = storage.si_b2a(storage_index)
+        storage_index_s = si_b2a(storage_index)
         self._reprstr = "<ReadBucketProxy %s to peer [%s] SI %s>" % (id(self), peer_id_s, storage_index_s)
         self._started = False # sent request to server
         self._ready = observer.OneShotObserverList() # got response from server
index 60a40624c96cd438d702715a644e0921783ed177..bbbc5c85c223dca85fef522c88a89fe523804488 100644 (file)
@@ -6,7 +6,8 @@ from twisted.internet import defer
 from foolscap import Referenceable, DeadReferenceError
 from foolscap.eventual import eventually
 import allmydata # for __full_version__
-from allmydata import interfaces, storage, uri
+from allmydata import interfaces, uri
+from allmydata.storage.server import si_b2a
 from allmydata.immutable import upload
 from allmydata.immutable.layout import ReadBucketProxy
 from allmydata.util.assertutil import precondition
@@ -86,7 +87,7 @@ class CHKCheckerAndUEBFetcher:
             self.log("no readers, so no UEB", level=log.NOISY)
             return
         b,peerid = self._readers.pop()
-        rbp = ReadBucketProxy(b, peerid, storage.si_b2a(self._storage_index))
+        rbp = ReadBucketProxy(b, peerid, si_b2a(self._storage_index))
         d = rbp.get_uri_extension()
         d.addCallback(self._got_uri_extension)
         d.addErrback(self._ueb_error)
@@ -142,7 +143,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         self._helper = helper
         self._incoming_file = incoming_file
         self._encoding_file = encoding_file
-        self._upload_id = storage.si_b2a(storage_index)[:5]
+        self._upload_id = si_b2a(storage_index)[:5]
         self._log_number = log_number
         self._results = results
         self._upload_status = upload.UploadStatus()
@@ -222,7 +223,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
 
     def _failed(self, f):
         self.log(format="CHKUploadHelper(%(si)s) failed",
-                 si=storage.si_b2a(self._storage_index)[:5],
+                 si=si_b2a(self._storage_index)[:5],
                  failure=f,
                  level=log.UNUSUAL)
         self._finished_observers.fire(f)
@@ -573,7 +574,7 @@ class Helper(Referenceable, service.MultiService):
         self.count("chk_upload_helper.upload_requests")
         r = upload.UploadResults()
         started = time.time()
-        si_s = storage.si_b2a(storage_index)
+        si_s = si_b2a(storage_index)
         lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
         incoming_file = os.path.join(self._chk_incoming, si_s)
         encoding_file = os.path.join(self._chk_encoding, si_s)
index d799c7ddce22738dbb0b47d4e50fb971f83156cc..84118a480b970f2bf3f566d467da7f09808ff052 100644 (file)
@@ -1,6 +1,6 @@
 from zope.interface import implements
 from twisted.internet import defer
-from allmydata import storage
+from allmydata.storage.server import si_b2a
 from allmydata.util import log, observer
 from allmydata.util.assertutil import precondition, _assert
 from allmydata.uri import CHKFileVerifierURI
@@ -40,7 +40,7 @@ class Repairer(log.PrefixingLogMixin):
     def __init__(self, client, verifycap, monitor):
         assert precondition(isinstance(verifycap, CHKFileVerifierURI))
 
-        logprefix = storage.si_b2a(verifycap.storage_index)[:5]
+        logprefix = si_b2a(verifycap.storage_index)[:5]
         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer", prefix=logprefix)
 
         self._client = client
index ed377daf26b4244981e2f90f4b64ea1718492cc0..36e01e2389263e413b50d9c6738ffdbf2139bb2a 100644 (file)
@@ -10,7 +10,8 @@ from allmydata.util.hashutil import file_renewal_secret_hash, \
      file_cancel_secret_hash, bucket_renewal_secret_hash, \
      bucket_cancel_secret_hash, plaintext_hasher, \
      storage_index_hash, plaintext_segment_hasher, convergence_hasher
-from allmydata import storage, hashtree, uri
+from allmydata import hashtree, uri
+from allmydata.storage.server import si_b2a
 from allmydata.immutable import encode
 from allmydata.util import base32, dictutil, idlib, log, mathutil
 from allmydata.util.assertutil import precondition
@@ -100,7 +101,7 @@ class PeerTracker:
     def __repr__(self):
         return ("<PeerTracker for peer %s and SI %s>"
                 % (idlib.shortnodeid_b2a(self.peerid),
-                   storage.si_b2a(self.storage_index)[:5]))
+                   si_b2a(self.storage_index)[:5]))
 
     def query(self, sharenums):
         d = self._storageserver.callRemote("allocate_buckets",
@@ -718,7 +719,7 @@ class CHKUploader:
         self._storage_index_elapsed = now - started
         storage_index = encoder.get_param("storage_index")
         self._storage_index = storage_index
-        upload_id = storage.si_b2a(storage_index)[:5]
+        upload_id = si_b2a(storage_index)[:5]
         self.log("using storage index %s" % upload_id)
         peer_selector = self.peer_selector_class(upload_id, self._log_number,
                                                  self._upload_status)
@@ -971,7 +972,7 @@ class AssistedUploader:
         now = self._time_contacting_helper_start = time.time()
         self._storage_index_elapsed = now - self._started
         self.log(format="contacting helper for SI %(si)s..",
-                 si=storage.si_b2a(self._storage_index))
+                 si=si_b2a(self._storage_index))
         self._upload_status.set_status("Contacting Helper")
         d = self._helper.callRemote("upload_chk", self._storage_index)
         d.addCallback(self._contacted_helper)
index 345581bbf244b2cbe82ad85064edade22efa208d..ed0ef0ebcd7e6c22f2e9ac998963f59ab3ee5282 100644 (file)
@@ -7,7 +7,8 @@ from twisted.internet import defer
 from twisted.python import failure
 from allmydata.interfaces import IPublishStatus, FileTooLargeError
 from allmydata.util import base32, hashutil, mathutil, idlib, log
-from allmydata import hashtree, codec, storage
+from allmydata import hashtree, codec
+from allmydata.storage.server import si_b2a
 from pycryptopp.cipher.aes import AES
 from foolscap.eventual import eventually
 
@@ -100,7 +101,7 @@ class Publish:
         self._node = filenode
         self._servermap = servermap
         self._storage_index = self._node.get_storage_index()
-        self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
+        self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
         num = self._node._client.log("Publish(%s): starting" % prefix)
         self._log_number = num
         self._running = True
index 152b57424979aef1ccb2bf453602258dbbd8a17d..d05e5296a82e99170eaea4f0eabe6a39c065067b 100644 (file)
@@ -8,7 +8,8 @@ from foolscap import DeadReferenceError
 from foolscap.eventual import eventually, fireEventually
 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError
 from allmydata.util import hashutil, idlib, log
-from allmydata import hashtree, codec, storage
+from allmydata import hashtree, codec
+from allmydata.storage.server import si_b2a
 from pycryptopp.cipher.aes import AES
 from pycryptopp.publickey import rsa
 
@@ -87,7 +88,7 @@ class Retrieve:
         self._storage_index = filenode.get_storage_index()
         assert self._node._readkey
         self._last_failure = None
-        prefix = storage.si_b2a(self._storage_index)[:5]
+        prefix = si_b2a(self._storage_index)[:5]
         self._log_number = log.msg("Retrieve(%s): starting" % prefix)
         self._outstanding_queries = {} # maps (peerid,shnum) to start_time
         self._running = True
index 3cca5469875db5981d9dfd45d456acf389aa78ae..7c0918ce6a9170fed23b56b31ad030a3133450a8 100644 (file)
@@ -7,7 +7,7 @@ from twisted.python import failure
 from foolscap import DeadReferenceError
 from foolscap.eventual import eventually
 from allmydata.util import base32, hashutil, idlib, log
-from allmydata import storage
+from allmydata.storage.server import si_b2a
 from allmydata.interfaces import IServermapUpdaterStatus
 from pycryptopp.publickey import rsa
 
@@ -385,7 +385,7 @@ class ServermapUpdater:
         # to ask for it during the check, we'll have problems doing the
         # publish.
 
-        prefix = storage.si_b2a(self._storage_index)[:5]
+        prefix = si_b2a(self._storage_index)[:5]
         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
                                    si=prefix, mode=mode)
 
index b05cfd99a0aa4bffd136cff974935739767e814a..db71e0f3a7b25b8f2e200711ba4202ea07f036d4 100644 (file)
@@ -30,7 +30,7 @@ verify-cap for the file that uses the share.
         self['filename'] = filename
 
 def dump_share(options):
-    from allmydata import storage
+    from allmydata.storage.mutable import MutableShareFile
 
     out = options.stdout
 
@@ -40,18 +40,19 @@ def dump_share(options):
     f = open(options['filename'], "rb")
     prefix = f.read(32)
     f.close()
-    if prefix == storage.MutableShareFile.MAGIC:
+    if prefix == MutableShareFile.MAGIC:
         return dump_mutable_share(options)
     # otherwise assume it's immutable
     return dump_immutable_share(options)
 
 def dump_immutable_share(options):
-    from allmydata import uri, storage
+    from allmydata import uri
+    from allmydata.storage.immutable import ShareFile
     from allmydata.util import base32
     from allmydata.immutable.layout import ReadBucketProxy
 
     out = options.stdout
-    f = storage.ShareFile(options['filename'])
+    f = ShareFile(options['filename'])
     # use a ReadBucketProxy to parse the bucket and find the uri extension
     bp = ReadBucketProxy(None, '', '')
     offsets = bp._parse_offsets(f.read_share_data(0, 0x44))
@@ -153,10 +154,10 @@ def format_expiration_time(expiration_time):
 
 
 def dump_mutable_share(options):
-    from allmydata import storage
+    from allmydata.storage.mutable import MutableShareFile
     from allmydata.util import base32, idlib
     out = options.stdout
-    m = storage.MutableShareFile(options['filename'])
+    m = MutableShareFile(options['filename'])
     f = open(options['filename'], "rb")
     WE, nodeid = m._read_write_enabler_and_nodeid(f)
     num_extra_leases = m._read_num_extra_leases(f)
@@ -371,7 +372,8 @@ def _dump_secrets(storage_index, secret, nodeid, out):
             print >>out, " lease cancel secret:", base32.b2a(cancel)
 
 def dump_uri_instance(u, nodeid, secret, out, show_header=True):
-    from allmydata import storage, uri
+    from allmydata import uri
+    from allmydata.storage.server import si_b2a
     from allmydata.util import base32, hashutil
 
     if isinstance(u, uri.CHKFileURI):
@@ -381,7 +383,7 @@ def dump_uri_instance(u, nodeid, secret, out, show_header=True):
         print >>out, " UEB hash:", base32.b2a(u.uri_extension_hash)
         print >>out, " size:", u.size
         print >>out, " k/N: %d/%d" % (u.needed_shares, u.total_shares)
-        print >>out, " storage index:", storage.si_b2a(u.storage_index)
+        print >>out, " storage index:", si_b2a(u.storage_index)
         _dump_secrets(u.storage_index, secret, nodeid, out)
     elif isinstance(u, uri.CHKFileVerifierURI):
         if show_header:
@@ -389,7 +391,7 @@ def dump_uri_instance(u, nodeid, secret, out, show_header=True):
         print >>out, " UEB hash:", base32.b2a(u.uri_extension_hash)
         print >>out, " size:", u.size
         print >>out, " k/N: %d/%d" % (u.needed_shares, u.total_shares)
-        print >>out, " storage index:", storage.si_b2a(u.storage_index)
+        print >>out, " storage index:", si_b2a(u.storage_index)
 
     elif isinstance(u, uri.LiteralFileURI):
         if show_header:
@@ -401,7 +403,7 @@ def dump_uri_instance(u, nodeid, secret, out, show_header=True):
             print >>out, "SSK Writeable URI:"
         print >>out, " writekey:", base32.b2a(u.writekey)
         print >>out, " readkey:", base32.b2a(u.readkey)
-        print >>out, " storage index:", storage.si_b2a(u.storage_index)
+        print >>out, " storage index:", si_b2a(u.storage_index)
         print >>out, " fingerprint:", base32.b2a(u.fingerprint)
         print >>out
         if nodeid:
@@ -414,12 +416,12 @@ def dump_uri_instance(u, nodeid, secret, out, show_header=True):
         if show_header:
             print >>out, "SSK Read-only URI:"
         print >>out, " readkey:", base32.b2a(u.readkey)
-        print >>out, " storage index:", storage.si_b2a(u.storage_index)
+        print >>out, " storage index:", si_b2a(u.storage_index)
         print >>out, " fingerprint:", base32.b2a(u.fingerprint)
     elif isinstance(u, uri.SSKVerifierURI):
         if show_header:
             print >>out, "SSK Verifier URI:"
-        print >>out, " storage index:", storage.si_b2a(u.storage_index)
+        print >>out, " storage index:", si_b2a(u.storage_index)
         print >>out, " fingerprint:", base32.b2a(u.fingerprint)
 
     elif isinstance(u, uri.NewDirectoryURI):
@@ -470,10 +472,10 @@ def find_shares(options):
     /home/warner/testnet/node-1/storage/shares/44k/44kai1tui348689nrw8fjegc8c/9
     /home/warner/testnet/node-2/storage/shares/44k/44kai1tui348689nrw8fjegc8c/2
     """
-    from allmydata import storage
+    from allmydata.storage.server import si_a2b, storage_index_to_dir
 
     out = options.stdout
-    sharedir = storage.storage_index_to_dir(storage.si_a2b(options.si_s))
+    sharedir = storage_index_to_dir(si_a2b(options.si_s))
     for d in options.nodedirs:
         d = os.path.join(os.path.expanduser(d), "storage/shares", sharedir)
         if os.path.exists(d):
@@ -529,7 +531,9 @@ def call(c, *args, **kwargs):
     return results[0]
 
 def describe_share(abs_sharefile, si_s, shnum_s, now, out):
-    from allmydata import uri, storage
+    from allmydata import uri
+    from allmydata.storage.mutable import MutableShareFile
+    from allmydata.storage.immutable import ShareFile
     from allmydata.mutable.layout import unpack_share
     from allmydata.mutable.common import NeedMoreDataError
     from allmydata.immutable.layout import ReadBucketProxy
@@ -539,9 +543,9 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
     f = open(abs_sharefile, "rb")
     prefix = f.read(32)
 
-    if prefix == storage.MutableShareFile.MAGIC:
+    if prefix == MutableShareFile.MAGIC:
         # mutable share
-        m = storage.MutableShareFile(abs_sharefile)
+        m = MutableShareFile(abs_sharefile)
         WE, nodeid = m._read_write_enabler_and_nodeid(f)
         num_extra_leases = m._read_num_extra_leases(f)
         data_length = m._read_data_length(f)
@@ -594,7 +598,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
                 return defer.succeed(sf.read_share_data(offset, size))
 
         # use a ReadBucketProxy to parse the bucket and find the uri extension
-        sf = storage.ShareFile(abs_sharefile)
+        sf = ShareFile(abs_sharefile)
         bp = ImmediateReadBucketProxy(sf)
 
         expiration_time = min( [lease.expiration_time
@@ -692,7 +696,8 @@ Obviously, this command should not be used in normal operation.
 
 def corrupt_share(options):
     import random
-    from allmydata import storage
+    from allmydata.storage.mutable import MutableShareFile
+    from allmydata.storage.immutable import ShareFile
     from allmydata.mutable.layout import unpack_header
     from allmydata.immutable.layout import ReadBucketProxy
     out = options.stdout
@@ -715,9 +720,9 @@ def corrupt_share(options):
     f = open(fn, "rb")
     prefix = f.read(32)
     f.close()
-    if prefix == storage.MutableShareFile.MAGIC:
+    if prefix == MutableShareFile.MAGIC:
         # mutable
-        m = storage.MutableShareFile(fn)
+        m = MutableShareFile(fn)
         f = open(fn, "rb")
         f.seek(m.DATA_OFFSET)
         data = f.read(2000)
@@ -734,7 +739,7 @@ def corrupt_share(options):
         flip_bit(start, end)
     else:
         # otherwise assume it's immutable
-        f = storage.ShareFile(fn)
+        f = ShareFile(fn)
         bp = ReadBucketProxy(None, '', '')
         offsets = bp._parse_offsets(f.read_share_data(0, 0x24))
         start = f._data_offset + offsets["data"]
diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py
deleted file mode 100644 (file)
index 29bce1d..0000000
+++ /dev/null
@@ -1,1312 +0,0 @@
-import os, re, weakref, stat, struct, time
-
-from foolscap import Referenceable
-from twisted.application import service
-
-from zope.interface import implements
-from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
-     RIBucketReader, BadWriteEnablerError, IStatsProducer
-from allmydata.util import base32, fileutil, idlib, log, time_format
-from allmydata.util.assertutil import precondition
-import allmydata # for __full_version__
-
-class DataTooLargeError(Exception):
-    pass
-
-# storage/
-# storage/shares/incoming
-#   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
-#   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
-# storage/shares/$START/$STORAGEINDEX
-# storage/shares/$START/$STORAGEINDEX/$SHARENUM
-
-# Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
-# base-32 chars).
-
-# $SHARENUM matches this regex:
-NUM_RE=re.compile("^[0-9]+$")
-
-# each share file (in storage/shares/$SI/$SHNUM) contains lease information
-# and share data. The share data is accessed by RIBucketWriter.write and
-# RIBucketReader.read . The lease information is not accessible through these
-# interfaces.
-
-# The share file has the following layout:
-#  0x00: share file version number, four bytes, current version is 1
-#  0x04: share data length, four bytes big-endian = A # See Footnote 1 below.
-#  0x08: number of leases, four bytes big-endian
-#  0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
-#  A+0x0c = B: first lease. Lease format is:
-#   B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
-#   B+0x04: renew secret, 32 bytes (SHA256)
-#   B+0x24: cancel secret, 32 bytes (SHA256)
-#   B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
-#   B+0x48: next lease, or end of record
-
-# Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers, but it is still
-# filled in by storage servers in case the storage server software gets downgraded from >= Tahoe
-# v1.3.0 to < Tahoe v1.3.0, or the share file is moved from one storage server to another.  The
-# value stored in this field is truncated, so If the actual share data length is >= 2**32, then
-# the value stored in this field will be the actual share data length modulo 2**32.
-
-def si_b2a(storageindex):
-    return base32.b2a(storageindex)
-
-def si_a2b(ascii_storageindex):
-    return base32.a2b(ascii_storageindex)
-
-def storage_index_to_dir(storageindex):
-    sia = si_b2a(storageindex)
-    return os.path.join(sia[:2], sia)
-
-class LeaseInfo:
-    def __init__(self, owner_num=None, renew_secret=None, cancel_secret=None,
-                 expiration_time=None, nodeid=None):
-        self.owner_num = owner_num
-        self.renew_secret = renew_secret
-        self.cancel_secret = cancel_secret
-        self.expiration_time = expiration_time
-        if nodeid is not None:
-            assert isinstance(nodeid, str)
-            assert len(nodeid) == 20
-        self.nodeid = nodeid
-
-    def from_immutable_data(self, data):
-        (self.owner_num,
-         self.renew_secret,
-         self.cancel_secret,
-         self.expiration_time) = struct.unpack(">L32s32sL", data)
-        self.nodeid = None
-        return self
-    def to_immutable_data(self):
-        return struct.pack(">L32s32sL",
-                           self.owner_num,
-                           self.renew_secret, self.cancel_secret,
-                           int(self.expiration_time))
-
-    def to_mutable_data(self):
-        return struct.pack(">LL32s32s20s",
-                           self.owner_num,
-                           int(self.expiration_time),
-                           self.renew_secret, self.cancel_secret,
-                           self.nodeid)
-    def from_mutable_data(self, data):
-        (self.owner_num,
-         self.expiration_time,
-         self.renew_secret, self.cancel_secret,
-         self.nodeid) = struct.unpack(">LL32s32s20s", data)
-        return self
-
-
-class ShareFile:
-    LEASE_SIZE = struct.calcsize(">L32s32sL")
-
-    def __init__(self, filename, max_size=None, create=False):
-        """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
-        precondition((max_size is not None) or (not create), max_size, create)
-        self.home = filename
-        self._max_size = max_size
-        if create:
-            # touch the file, so later callers will see that we're working on it.
-            # Also construct the metadata.
-            assert not os.path.exists(self.home)
-            fileutil.make_dirs(os.path.dirname(self.home))
-            f = open(self.home, 'wb')
-            # The second field -- the four-byte share data length -- is no
-            # longer used as of Tahoe v1.3.0, but we continue to write it in
-            # there in case someone downgrades a storage server from >=
-            # Tahoe-1.3.0 to < Tahoe-1.3.0, or moves a share file from one
-            # server to another, etc. We do saturation -- a share data length
-            # larger than 2**32-1 (what can fit into the field) is marked as
-            # the largest length that can fit into the field. That way, even
-            # if this does happen, the old < v1.3.0 server will still allow
-            # clients to read the first part of the share.
-            f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0))
-            f.close()
-            self._lease_offset = max_size + 0x0c
-            self._num_leases = 0
-        else:
-            f = open(self.home, 'rb')
-            filesize = os.path.getsize(self.home)
-            (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
-            f.close()
-            assert version == 1, version
-            self._num_leases = num_leases
-            self._lease_offset = filesize - (num_leases * self.LEASE_SIZE)
-        self._data_offset = 0xc
-
-    def unlink(self):
-        os.unlink(self.home)
-
-    def read_share_data(self, offset, length):
-        precondition(offset >= 0)
-        # reads beyond the end of the data are truncated. Reads that start beyond the end of the
-        # data return an empty string.
-        # I wonder why Python doesn't do the following computation for me?
-        seekpos = self._data_offset+offset
-        fsize = os.path.getsize(self.home)
-        actuallength = max(0, min(length, fsize-seekpos))
-        if actuallength == 0:
-            return ""
-        f = open(self.home, 'rb')
-        f.seek(seekpos)
-        return f.read(actuallength)
-
-    def write_share_data(self, offset, data):
-        length = len(data)
-        precondition(offset >= 0, offset)
-        if self._max_size is not None and offset+length > self._max_size:
-            raise DataTooLargeError(self._max_size, offset, length)
-        f = open(self.home, 'rb+')
-        real_offset = self._data_offset+offset
-        f.seek(real_offset)
-        assert f.tell() == real_offset
-        f.write(data)
-        f.close()
-
-    def _write_lease_record(self, f, lease_number, lease_info):
-        offset = self._lease_offset + lease_number * self.LEASE_SIZE
-        f.seek(offset)
-        assert f.tell() == offset
-        f.write(lease_info.to_immutable_data())
-
-    def _read_num_leases(self, f):
-        f.seek(0x08)
-        (num_leases,) = struct.unpack(">L", f.read(4))
-        return num_leases
-
-    def _write_num_leases(self, f, num_leases):
-        f.seek(0x08)
-        f.write(struct.pack(">L", num_leases))
-
-    def _truncate_leases(self, f, num_leases):
-        f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
-
-    def iter_leases(self):
-        """Yields (ownernum, renew_secret, cancel_secret, expiration_time)
-        for all leases."""
-        f = open(self.home, 'rb')
-        (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
-        f.seek(self._lease_offset)
-        for i in range(num_leases):
-            data = f.read(self.LEASE_SIZE)
-            if data:
-                yield LeaseInfo().from_immutable_data(data)
-
-    def add_lease(self, lease_info):
-        f = open(self.home, 'rb+')
-        num_leases = self._read_num_leases(f)
-        self._write_lease_record(f, num_leases, lease_info)
-        self._write_num_leases(f, num_leases+1)
-        f.close()
-
-    def renew_lease(self, renew_secret, new_expire_time):
-        for i,lease in enumerate(self.iter_leases()):
-            if lease.renew_secret == renew_secret:
-                # yup. See if we need to update the owner time.
-                if new_expire_time > lease.expiration_time:
-                    # yes
-                    lease.expiration_time = new_expire_time
-                    f = open(self.home, 'rb+')
-                    self._write_lease_record(f, i, lease)
-                    f.close()
-                return
-        raise IndexError("unable to renew non-existent lease")
-
-    def add_or_renew_lease(self, lease_info):
-        try:
-            self.renew_lease(lease_info.renew_secret,
-                             lease_info.expiration_time)
-        except IndexError:
-            self.add_lease(lease_info)
-
-
-    def cancel_lease(self, cancel_secret):
-        """Remove a lease with the given cancel_secret. If the last lease is
-        cancelled, the file will be removed. Return the number of bytes that
-        were freed (by truncating the list of leases, and possibly by
-        deleting the file. Raise IndexError if there was no lease with the
-        given cancel_secret.
-        """
-
-        leases = list(self.iter_leases())
-        num_leases = len(leases)
-        num_leases_removed = 0
-        for i,lease in enumerate(leases[:]):
-            if lease.cancel_secret == cancel_secret:
-                leases[i] = None
-                num_leases_removed += 1
-        if not num_leases_removed:
-            raise IndexError("unable to find matching lease to cancel")
-        if num_leases_removed:
-            # pack and write out the remaining leases. We write these out in
-            # the same order as they were added, so that if we crash while
-            # doing this, we won't lose any non-cancelled leases.
-            leases = [l for l in leases if l] # remove the cancelled leases
-            f = open(self.home, 'rb+')
-            for i,lease in enumerate(leases):
-                self._write_lease_record(f, i, lease)
-            self._write_num_leases(f, len(leases))
-            self._truncate_leases(f, len(leases))
-            f.close()
-        space_freed = self.LEASE_SIZE * num_leases_removed
-        if not len(leases):
-            space_freed += os.stat(self.home)[stat.ST_SIZE]
-            self.unlink()
-        return space_freed
-
-
-class BucketWriter(Referenceable):
-    implements(RIBucketWriter)
-
-    def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary):
-        self.ss = ss
-        self.incominghome = incominghome
-        self.finalhome = finalhome
-        self._max_size = max_size # don't allow the client to write more than this
-        self._canary = canary
-        self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
-        self.closed = False
-        self.throw_out_all_data = False
-        self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
-        # also, add our lease to the file now, so that other ones can be
-        # added by simultaneous uploaders
-        self._sharefile.add_lease(lease_info)
-
-    def allocated_size(self):
-        return self._max_size
-
-    def remote_write(self, offset, data):
-        start = time.time()
-        precondition(not self.closed)
-        if self.throw_out_all_data:
-            return
-        self._sharefile.write_share_data(offset, data)
-        self.ss.add_latency("write", time.time() - start)
-        self.ss.count("write")
-
-    def remote_close(self):
-        precondition(not self.closed)
-        start = time.time()
-
-        fileutil.make_dirs(os.path.dirname(self.finalhome))
-        fileutil.rename(self.incominghome, self.finalhome)
-        try:
-            # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
-            # We try to delete the parent (.../ab/abcde) to avoid leaving
-            # these directories lying around forever, but the delete might
-            # fail if we're working on another share for the same storage
-            # index (like ab/abcde/5). The alternative approach would be to
-            # use a hierarchy of objects (PrefixHolder, BucketHolder,
-            # ShareWriter), each of which is responsible for a single
-            # directory on disk, and have them use reference counting of
-            # their children to know when they should do the rmdir. This
-            # approach is simpler, but relies on os.rmdir refusing to delete
-            # a non-empty directory. Do *not* use fileutil.rm_dir() here!
-            os.rmdir(os.path.dirname(self.incominghome))
-            # we also delete the grandparent (prefix) directory, .../ab ,
-            # again to avoid leaving directories lying around. This might
-            # fail if there is another bucket open that shares a prefix (like
-            # ab/abfff).
-            os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
-            # we leave the great-grandparent (incoming/) directory in place.
-        except EnvironmentError:
-            # ignore the "can't rmdir because the directory is not empty"
-            # exceptions, those are normal consequences of the
-            # above-mentioned conditions.
-            pass
-        self._sharefile = None
-        self.closed = True
-        self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
-
-        filelen = os.stat(self.finalhome)[stat.ST_SIZE]
-        self.ss.bucket_writer_closed(self, filelen)
-        self.ss.add_latency("close", time.time() - start)
-        self.ss.count("close")
-
-    def _disconnected(self):
-        if not self.closed:
-            self._abort()
-
-    def remote_abort(self):
-        log.msg("storage: aborting sharefile %s" % self.incominghome,
-                facility="tahoe.storage", level=log.UNUSUAL)
-        if not self.closed:
-            self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
-        self._abort()
-        self.ss.count("abort")
-
-    def _abort(self):
-        if self.closed:
-            return
-        os.remove(self.incominghome)
-        # if we were the last share to be moved, remove the incoming/
-        # directory that was our parent
-        parentdir = os.path.split(self.incominghome)[0]
-        if not os.listdir(parentdir):
-            os.rmdir(parentdir)
-
-
-
-class BucketReader(Referenceable):
-    implements(RIBucketReader)
-
-    def __init__(self, ss, sharefname, storage_index=None, shnum=None):
-        self.ss = ss
-        self._share_file = ShareFile(sharefname)
-        self.storage_index = storage_index
-        self.shnum = shnum
-
-    def __repr__(self):
-        return "<%s %s %s>" % (self.__class__.__name__, base32.b2a_l(self.storage_index[:8], 60), self.shnum)
-
-    def remote_read(self, offset, length):
-        start = time.time()
-        data = self._share_file.read_share_data(offset, length)
-        self.ss.add_latency("read", time.time() - start)
-        self.ss.count("read")
-        return data
-
-    def remote_advise_corrupt_share(self, reason):
-        return self.ss.remote_advise_corrupt_share("immutable",
-                                                   self.storage_index,
-                                                   self.shnum,
-                                                   reason)
-
-# the MutableShareFile is like the ShareFile, but used for mutable data. It
-# has a different layout. See docs/mutable.txt for more details.
-
-# #   offset    size    name
-# 1   0         32      magic verstr "tahoe mutable container v1" plus binary
-# 2   32        20      write enabler's nodeid
-# 3   52        32      write enabler
-# 4   84        8       data size (actual share data present) (a)
-# 5   92        8       offset of (8) count of extra leases (after data)
-# 6   100       368     four leases, 92 bytes each
-#                        0    4   ownerid (0 means "no lease here")
-#                        4    4   expiration timestamp
-#                        8   32   renewal token
-#                        40  32   cancel token
-#                        72  20   nodeid which accepted the tokens
-# 7   468       (a)     data
-# 8   ??        4       count of extra leases
-# 9   ??        n*92    extra leases
-
-
-assert struct.calcsize("L"), 4 # The struct module doc says that L's are 4 bytes in size.
-assert struct.calcsize("Q"), 8 # The struct module doc says that Q's are 8 bytes in size (at least with big-endian ordering).
-
-class MutableShareFile:
-
-    DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
-    EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
-    HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
-    LEASE_SIZE = struct.calcsize(">LL32s32s20s")
-    assert LEASE_SIZE == 92
-    DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
-    assert DATA_OFFSET == 468, DATA_OFFSET
-    # our sharefiles share with a recognizable string, plus some random
-    # binary data to reduce the chance that a regular text file will look
-    # like a sharefile.
-    MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
-    assert len(MAGIC) == 32
-    MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
-    # TODO: decide upon a policy for max share size
-
-    def __init__(self, filename, parent=None):
-        self.home = filename
-        if os.path.exists(self.home):
-            # we don't cache anything, just check the magic
-            f = open(self.home, 'rb')
-            data = f.read(self.HEADER_SIZE)
-            (magic,
-             write_enabler_nodeid, write_enabler,
-             data_length, extra_least_offset) = \
-             struct.unpack(">32s20s32sQQ", data)
-            assert magic == self.MAGIC
-        self.parent = parent # for logging
-
-    def log(self, *args, **kwargs):
-        return self.parent.log(*args, **kwargs)
-
-    def create(self, my_nodeid, write_enabler):
-        assert not os.path.exists(self.home)
-        data_length = 0
-        extra_lease_offset = (self.HEADER_SIZE
-                              + 4 * self.LEASE_SIZE
-                              + data_length)
-        assert extra_lease_offset == self.DATA_OFFSET # true at creation
-        num_extra_leases = 0
-        f = open(self.home, 'wb')
-        header = struct.pack(">32s20s32sQQ",
-                             self.MAGIC, my_nodeid, write_enabler,
-                             data_length, extra_lease_offset,
-                             )
-        leases = ("\x00"*self.LEASE_SIZE) * 4
-        f.write(header + leases)
-        # data goes here, empty after creation
-        f.write(struct.pack(">L", num_extra_leases))
-        # extra leases go here, none at creation
-        f.close()
-
-    def unlink(self):
-        os.unlink(self.home)
-
-    def _read_data_length(self, f):
-        f.seek(self.DATA_LENGTH_OFFSET)
-        (data_length,) = struct.unpack(">Q", f.read(8))
-        return data_length
-
-    def _write_data_length(self, f, data_length):
-        f.seek(self.DATA_LENGTH_OFFSET)
-        f.write(struct.pack(">Q", data_length))
-
-    def _read_share_data(self, f, offset, length):
-        precondition(offset >= 0)
-        data_length = self._read_data_length(f)
-        if offset+length > data_length:
-            # reads beyond the end of the data are truncated. Reads that
-            # start beyond the end of the data return an empty string.
-            length = max(0, data_length-offset)
-        if length == 0:
-            return ""
-        precondition(offset+length <= data_length)
-        f.seek(self.DATA_OFFSET+offset)
-        data = f.read(length)
-        return data
-
-    def _read_extra_lease_offset(self, f):
-        f.seek(self.EXTRA_LEASE_OFFSET)
-        (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
-        return extra_lease_offset
-
-    def _write_extra_lease_offset(self, f, offset):
-        f.seek(self.EXTRA_LEASE_OFFSET)
-        f.write(struct.pack(">Q", offset))
-
-    def _read_num_extra_leases(self, f):
-        offset = self._read_extra_lease_offset(f)
-        f.seek(offset)
-        (num_extra_leases,) = struct.unpack(">L", f.read(4))
-        return num_extra_leases
-
-    def _write_num_extra_leases(self, f, num_leases):
-        extra_lease_offset = self._read_extra_lease_offset(f)
-        f.seek(extra_lease_offset)
-        f.write(struct.pack(">L", num_leases))
-
-    def _change_container_size(self, f, new_container_size):
-        if new_container_size > self.MAX_SIZE:
-            raise DataTooLargeError()
-        old_extra_lease_offset = self._read_extra_lease_offset(f)
-        new_extra_lease_offset = self.DATA_OFFSET + new_container_size
-        if new_extra_lease_offset < old_extra_lease_offset:
-            # TODO: allow containers to shrink. For now they remain large.
-            return
-        num_extra_leases = self._read_num_extra_leases(f)
-        f.seek(old_extra_lease_offset)
-        extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
-        f.seek(new_extra_lease_offset)
-        f.write(extra_lease_data)
-        # an interrupt here will corrupt the leases, iff the move caused the
-        # extra leases to overlap.
-        self._write_extra_lease_offset(f, new_extra_lease_offset)
-
-    def _write_share_data(self, f, offset, data):
-        length = len(data)
-        precondition(offset >= 0)
-        data_length = self._read_data_length(f)
-        extra_lease_offset = self._read_extra_lease_offset(f)
-
-        if offset+length >= data_length:
-            # They are expanding their data size.
-            if self.DATA_OFFSET+offset+length > extra_lease_offset:
-                # Their new data won't fit in the current container, so we
-                # have to move the leases. With luck, they're expanding it
-                # more than the size of the extra lease block, which will
-                # minimize the corrupt-the-share window
-                self._change_container_size(f, offset+length)
-                extra_lease_offset = self._read_extra_lease_offset(f)
-
-                # an interrupt here is ok.. the container has been enlarged
-                # but the data remains untouched
-
-            assert self.DATA_OFFSET+offset+length <= extra_lease_offset
-            # Their data now fits in the current container. We must write
-            # their new data and modify the recorded data size.
-            new_data_length = offset+length
-            self._write_data_length(f, new_data_length)
-            # an interrupt here will result in a corrupted share
-
-        # now all that's left to do is write out their data
-        f.seek(self.DATA_OFFSET+offset)
-        f.write(data)
-        return
-
-    def _write_lease_record(self, f, lease_number, lease_info):
-        extra_lease_offset = self._read_extra_lease_offset(f)
-        num_extra_leases = self._read_num_extra_leases(f)
-        if lease_number < 4:
-            offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
-        elif (lease_number-4) < num_extra_leases:
-            offset = (extra_lease_offset
-                      + 4
-                      + (lease_number-4)*self.LEASE_SIZE)
-        else:
-            # must add an extra lease record
-            self._write_num_extra_leases(f, num_extra_leases+1)
-            offset = (extra_lease_offset
-                      + 4
-                      + (lease_number-4)*self.LEASE_SIZE)
-        f.seek(offset)
-        assert f.tell() == offset
-        f.write(lease_info.to_mutable_data())
-
-    def _read_lease_record(self, f, lease_number):
-        # returns a LeaseInfo instance, or None
-        extra_lease_offset = self._read_extra_lease_offset(f)
-        num_extra_leases = self._read_num_extra_leases(f)
-        if lease_number < 4:
-            offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
-        elif (lease_number-4) < num_extra_leases:
-            offset = (extra_lease_offset
-                      + 4
-                      + (lease_number-4)*self.LEASE_SIZE)
-        else:
-            raise IndexError("No such lease number %d" % lease_number)
-        f.seek(offset)
-        assert f.tell() == offset
-        data = f.read(self.LEASE_SIZE)
-        lease_info = LeaseInfo().from_mutable_data(data)
-        if lease_info.owner_num == 0:
-            return None
-        return lease_info
-
-    def _get_num_lease_slots(self, f):
-        # how many places do we have allocated for leases? Not all of them
-        # are filled.
-        num_extra_leases = self._read_num_extra_leases(f)
-        return 4+num_extra_leases
-
-    def _get_first_empty_lease_slot(self, f):
-        # return an int with the index of an empty slot, or None if we do not
-        # currently have an empty slot
-
-        for i in range(self._get_num_lease_slots(f)):
-            if self._read_lease_record(f, i) is None:
-                return i
-        return None
-
-    def _enumerate_leases(self, f):
-        """Yields (leasenum, (ownerid, expiration_time, renew_secret,
-        cancel_secret, accepting_nodeid)) for all leases."""
-        for i in range(self._get_num_lease_slots(f)):
-            try:
-                data = self._read_lease_record(f, i)
-                if data is not None:
-                    yield (i,data)
-            except IndexError:
-                return
-
-    def debug_get_leases(self):
-        f = open(self.home, 'rb')
-        leases = list(self._enumerate_leases(f))
-        f.close()
-        return leases
-
-    def add_lease(self, lease_info):
-        precondition(lease_info.owner_num != 0) # 0 means "no lease here"
-        f = open(self.home, 'rb+')
-        num_lease_slots = self._get_num_lease_slots(f)
-        empty_slot = self._get_first_empty_lease_slot(f)
-        if empty_slot is not None:
-            self._write_lease_record(f, empty_slot, lease_info)
-        else:
-            self._write_lease_record(f, num_lease_slots, lease_info)
-        f.close()
-
-    def renew_lease(self, renew_secret, new_expire_time):
-        accepting_nodeids = set()
-        f = open(self.home, 'rb+')
-        for (leasenum,lease) in self._enumerate_leases(f):
-            if lease.renew_secret == renew_secret:
-                # yup. See if we need to update the owner time.
-                if new_expire_time > lease.expiration_time:
-                    # yes
-                    lease.expiration_time = new_expire_time
-                    self._write_lease_record(f, leasenum, lease)
-                f.close()
-                return
-            accepting_nodeids.add(lease.nodeid)
-        f.close()
-        # Return the accepting_nodeids set, to give the client a chance to
-        # update the leases on a share which has been migrated from its
-        # original server to a new one.
-        msg = ("Unable to renew non-existent lease. I have leases accepted by"
-               " nodeids: ")
-        msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
-                         for anid in accepting_nodeids])
-        msg += " ."
-        raise IndexError(msg)
-
-    def add_or_renew_lease(self, lease_info):
-        precondition(lease_info.owner_num != 0) # 0 means "no lease here"
-        try:
-            self.renew_lease(lease_info.renew_secret,
-                             lease_info.expiration_time)
-        except IndexError:
-            self.add_lease(lease_info)
-
-    def cancel_lease(self, cancel_secret):
-        """Remove any leases with the given cancel_secret. If the last lease
-        is cancelled, the file will be removed. Return the number of bytes
-        that were freed (by truncating the list of leases, and possibly by
-        deleting the file. Raise IndexError if there was no lease with the
-        given cancel_secret."""
-
-        accepting_nodeids = set()
-        modified = 0
-        remaining = 0
-        blank_lease = LeaseInfo(owner_num=0,
-                                renew_secret="\x00"*32,
-                                cancel_secret="\x00"*32,
-                                expiration_time=0,
-                                nodeid="\x00"*20)
-        f = open(self.home, 'rb+')
-        for (leasenum,lease) in self._enumerate_leases(f):
-            accepting_nodeids.add(lease.nodeid)
-            if lease.cancel_secret == cancel_secret:
-                self._write_lease_record(f, leasenum, blank_lease)
-                modified += 1
-            else:
-                remaining += 1
-        if modified:
-            freed_space = self._pack_leases(f)
-            f.close()
-            if not remaining:
-                freed_space += os.stat(self.home)[stat.ST_SIZE]
-                self.unlink()
-            return freed_space
-
-        msg = ("Unable to cancel non-existent lease. I have leases "
-               "accepted by nodeids: ")
-        msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
-                         for anid in accepting_nodeids])
-        msg += " ."
-        raise IndexError(msg)
-
-    def _pack_leases(self, f):
-        # TODO: reclaim space from cancelled leases
-        return 0
-
-    def _read_write_enabler_and_nodeid(self, f):
-        f.seek(0)
-        data = f.read(self.HEADER_SIZE)
-        (magic,
-         write_enabler_nodeid, write_enabler,
-         data_length, extra_least_offset) = \
-         struct.unpack(">32s20s32sQQ", data)
-        assert magic == self.MAGIC
-        return (write_enabler, write_enabler_nodeid)
-
-    def readv(self, readv):
-        datav = []
-        f = open(self.home, 'rb')
-        for (offset, length) in readv:
-            datav.append(self._read_share_data(f, offset, length))
-        f.close()
-        return datav
-
-#    def remote_get_length(self):
-#        f = open(self.home, 'rb')
-#        data_length = self._read_data_length(f)
-#        f.close()
-#        return data_length
-
-    def check_write_enabler(self, write_enabler, si_s):
-        f = open(self.home, 'rb+')
-        (real_write_enabler, write_enabler_nodeid) = \
-                             self._read_write_enabler_and_nodeid(f)
-        f.close()
-        if write_enabler != real_write_enabler:
-            # accomodate share migration by reporting the nodeid used for the
-            # old write enabler.
-            self.log(format="bad write enabler on SI %(si)s,"
-                     " recorded by nodeid %(nodeid)s",
-                     facility="tahoe.storage",
-                     level=log.WEIRD, umid="cE1eBQ",
-                     si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
-            msg = "The write enabler was recorded by nodeid '%s'." % \
-                  (idlib.nodeid_b2a(write_enabler_nodeid),)
-            raise BadWriteEnablerError(msg)
-
-    def check_testv(self, testv):
-        test_good = True
-        f = open(self.home, 'rb+')
-        for (offset, length, operator, specimen) in testv:
-            data = self._read_share_data(f, offset, length)
-            if not testv_compare(data, operator, specimen):
-                test_good = False
-                break
-        f.close()
-        return test_good
-
-    def writev(self, datav, new_length):
-        f = open(self.home, 'rb+')
-        for (offset, data) in datav:
-            self._write_share_data(f, offset, data)
-        if new_length is not None:
-            self._change_container_size(f, new_length)
-            f.seek(self.DATA_LENGTH_OFFSET)
-            f.write(struct.pack(">Q", new_length))
-        f.close()
-
-def testv_compare(a, op, b):
-    assert op in ("lt", "le", "eq", "ne", "ge", "gt")
-    if op == "lt":
-        return a < b
-    if op == "le":
-        return a <= b
-    if op == "eq":
-        return a == b
-    if op == "ne":
-        return a != b
-    if op == "ge":
-        return a >= b
-    if op == "gt":
-        return a > b
-    # never reached
-
-class EmptyShare:
-
-    def check_testv(self, testv):
-        test_good = True
-        for (offset, length, operator, specimen) in testv:
-            data = ""
-            if not testv_compare(data, operator, specimen):
-                test_good = False
-                break
-        return test_good
-
-def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
-    ms = MutableShareFile(filename, parent)
-    ms.create(my_nodeid, write_enabler)
-    del ms
-    return MutableShareFile(filename, parent)
-
-
-class StorageServer(service.MultiService, Referenceable):
-    implements(RIStorageServer, IStatsProducer)
-    name = 'storage'
-
-    def __init__(self, storedir, reserved_space=0,
-                 discard_storage=False, readonly_storage=False,
-                 stats_provider=None):
-        service.MultiService.__init__(self)
-        self.storedir = storedir
-        sharedir = os.path.join(storedir, "shares")
-        fileutil.make_dirs(sharedir)
-        self.sharedir = sharedir
-        # we don't actually create the corruption-advisory dir until necessary
-        self.corruption_advisory_dir = os.path.join(storedir,
-                                                    "corruption-advisories")
-        self.reserved_space = int(reserved_space)
-        self.no_storage = discard_storage
-        self.readonly_storage = readonly_storage
-        self.stats_provider = stats_provider
-        if self.stats_provider:
-            self.stats_provider.register_producer(self)
-        self.incomingdir = os.path.join(sharedir, 'incoming')
-        self._clean_incomplete()
-        fileutil.make_dirs(self.incomingdir)
-        self._active_writers = weakref.WeakKeyDictionary()
-        lp = log.msg("StorageServer created", facility="tahoe.storage")
-
-        if reserved_space:
-            if self.get_available_space() is None:
-                log.msg("warning: [storage]reserved_space= is set, but this platform does not support statvfs(2), so this reservation cannot be honored",
-                        umin="0wZ27w", level=log.UNUSUAL)
-
-        self.latencies = {"allocate": [], # immutable
-                          "write": [],
-                          "close": [],
-                          "read": [],
-                          "get": [],
-                          "writev": [], # mutable
-                          "readv": [],
-                          "add-lease": [], # both
-                          "renew": [],
-                          "cancel": [],
-                          }
-
-    def count(self, name, delta=1):
-        if self.stats_provider:
-            self.stats_provider.count("storage_server." + name, delta)
-
-    def add_latency(self, category, latency):
-        a = self.latencies[category]
-        a.append(latency)
-        if len(a) > 1000:
-            self.latencies[category] = a[-1000:]
-
-    def get_latencies(self):
-        """Return a dict, indexed by category, that contains a dict of
-        latency numbers for each category. Each dict will contain the
-        following keys: mean, 01_0_percentile, 10_0_percentile,
-        50_0_percentile (median), 90_0_percentile, 95_0_percentile,
-        99_0_percentile, 99_9_percentile. If no samples have been collected
-        for the given category, then that category name will not be present
-        in the return value."""
-        # note that Amazon's Dynamo paper says they use 99.9% percentile.
-        output = {}
-        for category in self.latencies:
-            if not self.latencies[category]:
-                continue
-            stats = {}
-            samples = self.latencies[category][:]
-            samples.sort()
-            count = len(samples)
-            stats["mean"] = sum(samples) / count
-            stats["01_0_percentile"] = samples[int(0.01 * count)]
-            stats["10_0_percentile"] = samples[int(0.1 * count)]
-            stats["50_0_percentile"] = samples[int(0.5 * count)]
-            stats["90_0_percentile"] = samples[int(0.9 * count)]
-            stats["95_0_percentile"] = samples[int(0.95 * count)]
-            stats["99_0_percentile"] = samples[int(0.99 * count)]
-            stats["99_9_percentile"] = samples[int(0.999 * count)]
-            output[category] = stats
-        return output
-
-    def log(self, *args, **kwargs):
-        if "facility" not in kwargs:
-            kwargs["facility"] = "tahoe.storage"
-        return log.msg(*args, **kwargs)
-
-    def setNodeID(self, nodeid):
-        # somebody must set this before any slots can be created or leases
-        # added
-        self.my_nodeid = nodeid
-
-    def startService(self):
-        service.MultiService.startService(self)
-        if self.parent:
-            nodeid = self.parent.nodeid # 20 bytes, binary
-            assert len(nodeid) == 20
-            self.setNodeID(nodeid)
-
-    def _clean_incomplete(self):
-        fileutil.rm_dir(self.incomingdir)
-
-    def get_stats(self):
-        # remember: RIStatsProvider requires that our return dict
-        # contains numeric values.
-        stats = { 'storage_server.allocated': self.allocated_size(), }
-        for category,ld in self.get_latencies().items():
-            for name,v in ld.items():
-                stats['storage_server.latencies.%s.%s' % (category, name)] = v
-        writeable = True
-        if self.readonly_storage:
-            writeable = False
-        try:
-            s = os.statvfs(self.storedir)
-            disk_total = s.f_bsize * s.f_blocks
-            disk_used = s.f_bsize * (s.f_blocks - s.f_bfree)
-            # spacetime predictors should look at the slope of disk_used.
-            disk_avail = s.f_bsize * s.f_bavail # available to non-root users
-            # include our local policy here: if we stop accepting shares when
-            # the available space drops below 1GB, then include that fact in
-            # disk_avail.
-            disk_avail -= self.reserved_space
-            disk_avail = max(disk_avail, 0)
-            if self.readonly_storage:
-                disk_avail = 0
-            if disk_avail == 0:
-                writeable = False
-
-            # spacetime predictors should use disk_avail / (d(disk_used)/dt)
-            stats["storage_server.disk_total"] = disk_total
-            stats["storage_server.disk_used"] = disk_used
-            stats["storage_server.disk_avail"] = disk_avail
-        except AttributeError:
-            # os.statvfs is available only on unix
-            pass
-        stats["storage_server.accepting_immutable_shares"] = int(writeable)
-        return stats
-
-
-    def stat_disk(self, d):
-        s = os.statvfs(d)
-        # s.f_bavail: available to non-root users
-        disk_avail = s.f_bsize * s.f_bavail
-        return disk_avail
-
-    def get_available_space(self):
-        # returns None if it cannot be measured (windows)
-        try:
-            disk_avail = self.stat_disk(self.storedir)
-            disk_avail -= self.reserved_space
-        except AttributeError:
-            disk_avail = None
-        if self.readonly_storage:
-            disk_avail = 0
-        return disk_avail
-
-    def allocated_size(self):
-        space = 0
-        for bw in self._active_writers:
-            space += bw.allocated_size()
-        return space
-
-    def remote_get_version(self):
-        remaining_space = self.get_available_space()
-        if remaining_space is None:
-            # we're on a platform that doesn't have 'df', so make a vague
-            # guess.
-            remaining_space = 2**64
-        version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
-                    { "maximum-immutable-share-size": remaining_space,
-                      "tolerates-immutable-read-overrun": True,
-                      "delete-mutable-shares-with-zero-length-writev": True,
-                      },
-                    "application-version": str(allmydata.__full_version__),
-                    }
-        return version
-
-    def remote_allocate_buckets(self, storage_index,
-                                renew_secret, cancel_secret,
-                                sharenums, allocated_size,
-                                canary, owner_num=0):
-        # owner_num is not for clients to set, but rather it should be
-        # curried into the PersonalStorageServer instance that is dedicated
-        # to a particular owner.
-        start = time.time()
-        self.count("allocate")
-        alreadygot = set()
-        bucketwriters = {} # k: shnum, v: BucketWriter
-        si_dir = storage_index_to_dir(storage_index)
-        si_s = si_b2a(storage_index)
-
-        log.msg("storage: allocate_buckets %s" % si_s)
-
-        # in this implementation, the lease information (including secrets)
-        # goes into the share files themselves. It could also be put into a
-        # separate database. Note that the lease should not be added until
-        # the BucketWriter has been closed.
-        expire_time = time.time() + 31*24*60*60
-        lease_info = LeaseInfo(owner_num,
-                               renew_secret, cancel_secret,
-                               expire_time, self.my_nodeid)
-
-        max_space_per_bucket = allocated_size
-
-        remaining_space = self.get_available_space()
-        limited = remaining_space is not None
-        if limited:
-            # this is a bit conservative, since some of this allocated_size()
-            # has already been written to disk, where it will show up in
-            # get_available_space.
-            remaining_space -= self.allocated_size()
-
-        # fill alreadygot with all shares that we have, not just the ones
-        # they asked about: this will save them a lot of work. Add or update
-        # leases for all of them: if they want us to hold shares for this
-        # file, they'll want us to hold leases for this file.
-        for (shnum, fn) in self._get_bucket_shares(storage_index):
-            alreadygot.add(shnum)
-            sf = ShareFile(fn)
-            sf.add_or_renew_lease(lease_info)
-
-        # self.readonly_storage causes remaining_space=0
-
-        for shnum in sharenums:
-            incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
-            finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
-            if os.path.exists(finalhome):
-                # great! we already have it. easy.
-                pass
-            elif os.path.exists(incominghome):
-                # Note that we don't create BucketWriters for shnums that
-                # have a partial share (in incoming/), so if a second upload
-                # occurs while the first is still in progress, the second
-                # uploader will use different storage servers.
-                pass
-            elif (not limited) or (remaining_space >= max_space_per_bucket):
-                # ok! we need to create the new share file.
-                bw = BucketWriter(self, incominghome, finalhome,
-                                  max_space_per_bucket, lease_info, canary)
-                if self.no_storage:
-                    bw.throw_out_all_data = True
-                bucketwriters[shnum] = bw
-                self._active_writers[bw] = 1
-                if limited:
-                    remaining_space -= max_space_per_bucket
-            else:
-                # bummer! not enough space to accept this bucket
-                pass
-
-        if bucketwriters:
-            fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
-
-        self.add_latency("allocate", time.time() - start)
-        return alreadygot, bucketwriters
-
-    def _iter_share_files(self, storage_index):
-        for shnum, filename in self._get_bucket_shares(storage_index):
-            f = open(filename, 'rb')
-            header = f.read(32)
-            f.close()
-            if header[:32] == MutableShareFile.MAGIC:
-                sf = MutableShareFile(filename, self)
-                # note: if the share has been migrated, the renew_lease()
-                # call will throw an exception, with information to help the
-                # client update the lease.
-            elif header[:4] == struct.pack(">L", 1):
-                sf = ShareFile(filename)
-            else:
-                continue # non-sharefile
-            yield sf
-
-    def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
-                         owner_num=1):
-        start = time.time()
-        self.count("add-lease")
-        new_expire_time = time.time() + 31*24*60*60
-        lease_info = LeaseInfo(owner_num,
-                               renew_secret, cancel_secret,
-                               new_expire_time, self.my_nodeid)
-        for sf in self._iter_share_files(storage_index):
-            sf.add_or_renew_lease(lease_info)
-        self.add_latency("add-lease", time.time() - start)
-        return None
-
-    def remote_renew_lease(self, storage_index, renew_secret):
-        start = time.time()
-        self.count("renew")
-        new_expire_time = time.time() + 31*24*60*60
-        found_buckets = False
-        for sf in self._iter_share_files(storage_index):
-            found_buckets = True
-            sf.renew_lease(renew_secret, new_expire_time)
-        self.add_latency("renew", time.time() - start)
-        if not found_buckets:
-            raise IndexError("no such lease to renew")
-
-    def remote_cancel_lease(self, storage_index, cancel_secret):
-        start = time.time()
-        self.count("cancel")
-
-        total_space_freed = 0
-        found_buckets = False
-        for sf in self._iter_share_files(storage_index):
-            # note: if we can't find a lease on one share, we won't bother
-            # looking in the others. Unless something broke internally
-            # (perhaps we ran out of disk space while adding a lease), the
-            # leases on all shares will be identical.
-            found_buckets = True
-            # this raises IndexError if the lease wasn't present XXXX
-            total_space_freed += sf.cancel_lease(cancel_secret)
-
-        if found_buckets:
-            storagedir = os.path.join(self.sharedir,
-                                      storage_index_to_dir(storage_index))
-            if not os.listdir(storagedir):
-                os.rmdir(storagedir)
-
-        if self.stats_provider:
-            self.stats_provider.count('storage_server.bytes_freed',
-                                      total_space_freed)
-        self.add_latency("cancel", time.time() - start)
-        if not found_buckets:
-            raise IndexError("no such storage index")
-
-    def bucket_writer_closed(self, bw, consumed_size):
-        if self.stats_provider:
-            self.stats_provider.count('storage_server.bytes_added', consumed_size)
-        del self._active_writers[bw]
-
-    def _get_bucket_shares(self, storage_index):
-        """Return a list of (shnum, pathname) tuples for files that hold
-        shares for this storage_index. In each tuple, 'shnum' will always be
-        the integer form of the last component of 'pathname'."""
-        storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
-        try:
-            for f in os.listdir(storagedir):
-                if NUM_RE.match(f):
-                    filename = os.path.join(storagedir, f)
-                    yield (int(f), filename)
-        except OSError:
-            # Commonly caused by there being no buckets at all.
-            pass
-
-    def remote_get_buckets(self, storage_index):
-        start = time.time()
-        self.count("get")
-        si_s = si_b2a(storage_index)
-        log.msg("storage: get_buckets %s" % si_s)
-        bucketreaders = {} # k: sharenum, v: BucketReader
-        for shnum, filename in self._get_bucket_shares(storage_index):
-            bucketreaders[shnum] = BucketReader(self, filename,
-                                                storage_index, shnum)
-        self.add_latency("get", time.time() - start)
-        return bucketreaders
-
-    def get_leases(self, storage_index):
-        """Provide an iterator that yields all of the leases attached to this
-        bucket. Each lease is returned as a tuple of (owner_num,
-        renew_secret, cancel_secret, expiration_time).
-
-        This method is not for client use.
-        """
-
-        # since all shares get the same lease data, we just grab the leases
-        # from the first share
-        try:
-            shnum, filename = self._get_bucket_shares(storage_index).next()
-            sf = ShareFile(filename)
-            return sf.iter_leases()
-        except StopIteration:
-            return iter([])
-
-    def remote_slot_testv_and_readv_and_writev(self, storage_index,
-                                               secrets,
-                                               test_and_write_vectors,
-                                               read_vector):
-        start = time.time()
-        self.count("writev")
-        si_s = si_b2a(storage_index)
-        lp = log.msg("storage: slot_writev %s" % si_s)
-        si_dir = storage_index_to_dir(storage_index)
-        (write_enabler, renew_secret, cancel_secret) = secrets
-        # shares exist if there is a file for them
-        bucketdir = os.path.join(self.sharedir, si_dir)
-        shares = {}
-        if os.path.isdir(bucketdir):
-            for sharenum_s in os.listdir(bucketdir):
-                try:
-                    sharenum = int(sharenum_s)
-                except ValueError:
-                    continue
-                filename = os.path.join(bucketdir, sharenum_s)
-                msf = MutableShareFile(filename, self)
-                msf.check_write_enabler(write_enabler, si_s)
-                shares[sharenum] = msf
-        # write_enabler is good for all existing shares.
-
-        # Now evaluate test vectors.
-        testv_is_good = True
-        for sharenum in test_and_write_vectors:
-            (testv, datav, new_length) = test_and_write_vectors[sharenum]
-            if sharenum in shares:
-                if not shares[sharenum].check_testv(testv):
-                    self.log("testv failed: [%d]: %r" % (sharenum, testv))
-                    testv_is_good = False
-                    break
-            else:
-                # compare the vectors against an empty share, in which all
-                # reads return empty strings.
-                if not EmptyShare().check_testv(testv):
-                    self.log("testv failed (empty): [%d] %r" % (sharenum,
-                                                                testv))
-                    testv_is_good = False
-                    break
-
-        # now gather the read vectors, before we do any writes
-        read_data = {}
-        for sharenum, share in shares.items():
-            read_data[sharenum] = share.readv(read_vector)
-
-        ownerid = 1 # TODO
-        expire_time = time.time() + 31*24*60*60   # one month
-        lease_info = LeaseInfo(ownerid,
-                               renew_secret, cancel_secret,
-                               expire_time, self.my_nodeid)
-
-        if testv_is_good:
-            # now apply the write vectors
-            for sharenum in test_and_write_vectors:
-                (testv, datav, new_length) = test_and_write_vectors[sharenum]
-                if new_length == 0:
-                    if sharenum in shares:
-                        shares[sharenum].unlink()
-                else:
-                    if sharenum not in shares:
-                        # allocate a new share
-                        allocated_size = 2000 # arbitrary, really
-                        share = self._allocate_slot_share(bucketdir, secrets,
-                                                          sharenum,
-                                                          allocated_size,
-                                                          owner_num=0)
-                        shares[sharenum] = share
-                    shares[sharenum].writev(datav, new_length)
-                    # and update the lease
-                    shares[sharenum].add_or_renew_lease(lease_info)
-
-            if new_length == 0:
-                # delete empty bucket directories
-                if not os.listdir(bucketdir):
-                    os.rmdir(bucketdir)
-
-
-        # all done
-        self.add_latency("writev", time.time() - start)
-        return (testv_is_good, read_data)
-
-    def _allocate_slot_share(self, bucketdir, secrets, sharenum,
-                             allocated_size, owner_num=0):
-        (write_enabler, renew_secret, cancel_secret) = secrets
-        my_nodeid = self.my_nodeid
-        fileutil.make_dirs(bucketdir)
-        filename = os.path.join(bucketdir, "%d" % sharenum)
-        share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
-                                         self)
-        return share
-
-    def remote_slot_readv(self, storage_index, shares, readv):
-        start = time.time()
-        self.count("readv")
-        si_s = si_b2a(storage_index)
-        lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
-                     facility="tahoe.storage", level=log.OPERATIONAL)
-        si_dir = storage_index_to_dir(storage_index)
-        # shares exist if there is a file for them
-        bucketdir = os.path.join(self.sharedir, si_dir)
-        if not os.path.isdir(bucketdir):
-            self.add_latency("readv", time.time() - start)
-            return {}
-        datavs = {}
-        for sharenum_s in os.listdir(bucketdir):
-            try:
-                sharenum = int(sharenum_s)
-            except ValueError:
-                continue
-            if sharenum in shares or not shares:
-                filename = os.path.join(bucketdir, sharenum_s)
-                msf = MutableShareFile(filename, self)
-                datavs[sharenum] = msf.readv(readv)
-        log.msg("returning shares %s" % (datavs.keys(),),
-                facility="tahoe.storage", level=log.NOISY, parent=lp)
-        self.add_latency("readv", time.time() - start)
-        return datavs
-
-    def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
-                                    reason):
-        fileutil.make_dirs(self.corruption_advisory_dir)
-        now = time_format.iso_utc(sep="T")
-        si_s = base32.b2a(storage_index)
-        # windows can't handle colons in the filename
-        fn = os.path.join(self.corruption_advisory_dir,
-                          "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
-        f = open(fn, "w")
-        f.write("report: Share Corruption\n")
-        f.write("type: %s\n" % share_type)
-        f.write("storage_index: %s\n" % si_s)
-        f.write("share_number: %d\n" % shnum)
-        f.write("\n")
-        f.write(reason)
-        f.write("\n")
-        f.close()
-        log.msg(format=("client claims corruption in (%(share_type)s) " +
-                        "%(si)s-%(shnum)d: %(reason)s"),
-                share_type=share_type, si=si_s, shnum=shnum, reason=reason,
-                level=log.SCARY, umid="SGx2fA")
-        return None
diff --git a/src/allmydata/storage/__init__.py b/src/allmydata/storage/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/allmydata/storage/common.py b/src/allmydata/storage/common.py
new file mode 100644 (file)
index 0000000..695c044
--- /dev/null
@@ -0,0 +1,4 @@
+
+class DataTooLargeError(Exception):
+    pass
+
diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py
new file mode 100644 (file)
index 0000000..6ebadbb
--- /dev/null
@@ -0,0 +1,308 @@
+import os, stat, struct, time
+
+from foolscap import Referenceable
+
+from zope.interface import implements
+from allmydata.interfaces import RIBucketWriter, RIBucketReader
+from allmydata.util import base32, fileutil, log
+from allmydata.util.assertutil import precondition
+from allmydata.storage.lease import LeaseInfo
+from allmydata.storage.common import DataTooLargeError
+
+# each share file (in storage/shares/$SI/$SHNUM) contains lease information
+# and share data. The share data is accessed by RIBucketWriter.write and
+# RIBucketReader.read . The lease information is not accessible through these
+# interfaces.
+
+# The share file has the following layout:
+#  0x00: share file version number, four bytes, current version is 1
+#  0x04: share data length, four bytes big-endian = A # See Footnote 1 below.
+#  0x08: number of leases, four bytes big-endian
+#  0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
+#  A+0x0c = B: first lease. Lease format is:
+#   B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
+#   B+0x04: renew secret, 32 bytes (SHA256)
+#   B+0x24: cancel secret, 32 bytes (SHA256)
+#   B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
+#   B+0x48: next lease, or end of record
+
+# Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers, but it is still
+# filled in by storage servers in case the storage server software gets downgraded from >= Tahoe
+# v1.3.0 to < Tahoe v1.3.0, or the share file is moved from one storage server to another.  The
+# value stored in this field is truncated, so If the actual share data length is >= 2**32, then
+# the value stored in this field will be the actual share data length modulo 2**32.
+
+class ShareFile:
+    LEASE_SIZE = struct.calcsize(">L32s32sL")
+
+    def __init__(self, filename, max_size=None, create=False):
+        """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
+        precondition((max_size is not None) or (not create), max_size, create)
+        self.home = filename
+        self._max_size = max_size
+        if create:
+            # touch the file, so later callers will see that we're working on it.
+            # Also construct the metadata.
+            assert not os.path.exists(self.home)
+            fileutil.make_dirs(os.path.dirname(self.home))
+            f = open(self.home, 'wb')
+            # The second field -- the four-byte share data length -- is no
+            # longer used as of Tahoe v1.3.0, but we continue to write it in
+            # there in case someone downgrades a storage server from >=
+            # Tahoe-1.3.0 to < Tahoe-1.3.0, or moves a share file from one
+            # server to another, etc. We do saturation -- a share data length
+            # larger than 2**32-1 (what can fit into the field) is marked as
+            # the largest length that can fit into the field. That way, even
+            # if this does happen, the old < v1.3.0 server will still allow
+            # clients to read the first part of the share.
+            f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0))
+            f.close()
+            self._lease_offset = max_size + 0x0c
+            self._num_leases = 0
+        else:
+            f = open(self.home, 'rb')
+            filesize = os.path.getsize(self.home)
+            (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
+            f.close()
+            assert version == 1, version
+            self._num_leases = num_leases
+            self._lease_offset = filesize - (num_leases * self.LEASE_SIZE)
+        self._data_offset = 0xc
+
+    def unlink(self):
+        os.unlink(self.home)
+
+    def read_share_data(self, offset, length):
+        precondition(offset >= 0)
+        # reads beyond the end of the data are truncated. Reads that start beyond the end of the
+        # data return an empty string.
+        # I wonder why Python doesn't do the following computation for me?
+        seekpos = self._data_offset+offset
+        fsize = os.path.getsize(self.home)
+        actuallength = max(0, min(length, fsize-seekpos))
+        if actuallength == 0:
+            return ""
+        f = open(self.home, 'rb')
+        f.seek(seekpos)
+        return f.read(actuallength)
+
+    def write_share_data(self, offset, data):
+        length = len(data)
+        precondition(offset >= 0, offset)
+        if self._max_size is not None and offset+length > self._max_size:
+            raise DataTooLargeError(self._max_size, offset, length)
+        f = open(self.home, 'rb+')
+        real_offset = self._data_offset+offset
+        f.seek(real_offset)
+        assert f.tell() == real_offset
+        f.write(data)
+        f.close()
+
+    def _write_lease_record(self, f, lease_number, lease_info):
+        offset = self._lease_offset + lease_number * self.LEASE_SIZE
+        f.seek(offset)
+        assert f.tell() == offset
+        f.write(lease_info.to_immutable_data())
+
+    def _read_num_leases(self, f):
+        f.seek(0x08)
+        (num_leases,) = struct.unpack(">L", f.read(4))
+        return num_leases
+
+    def _write_num_leases(self, f, num_leases):
+        f.seek(0x08)
+        f.write(struct.pack(">L", num_leases))
+
+    def _truncate_leases(self, f, num_leases):
+        f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
+
+    def iter_leases(self):
+        """Yields (ownernum, renew_secret, cancel_secret, expiration_time)
+        for all leases."""
+        f = open(self.home, 'rb')
+        (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
+        f.seek(self._lease_offset)
+        for i in range(num_leases):
+            data = f.read(self.LEASE_SIZE)
+            if data:
+                yield LeaseInfo().from_immutable_data(data)
+
+    def add_lease(self, lease_info):
+        f = open(self.home, 'rb+')
+        num_leases = self._read_num_leases(f)
+        self._write_lease_record(f, num_leases, lease_info)
+        self._write_num_leases(f, num_leases+1)
+        f.close()
+
+    def renew_lease(self, renew_secret, new_expire_time):
+        for i,lease in enumerate(self.iter_leases()):
+            if lease.renew_secret == renew_secret:
+                # yup. See if we need to update the owner time.
+                if new_expire_time > lease.expiration_time:
+                    # yes
+                    lease.expiration_time = new_expire_time
+                    f = open(self.home, 'rb+')
+                    self._write_lease_record(f, i, lease)
+                    f.close()
+                return
+        raise IndexError("unable to renew non-existent lease")
+
+    def add_or_renew_lease(self, lease_info):
+        try:
+            self.renew_lease(lease_info.renew_secret,
+                             lease_info.expiration_time)
+        except IndexError:
+            self.add_lease(lease_info)
+
+
+    def cancel_lease(self, cancel_secret):
+        """Remove a lease with the given cancel_secret. If the last lease is
+        cancelled, the file will be removed. Return the number of bytes that
+        were freed (by truncating the list of leases, and possibly by
+        deleting the file. Raise IndexError if there was no lease with the
+        given cancel_secret.
+        """
+
+        leases = list(self.iter_leases())
+        num_leases = len(leases)
+        num_leases_removed = 0
+        for i,lease in enumerate(leases[:]):
+            if lease.cancel_secret == cancel_secret:
+                leases[i] = None
+                num_leases_removed += 1
+        if not num_leases_removed:
+            raise IndexError("unable to find matching lease to cancel")
+        if num_leases_removed:
+            # pack and write out the remaining leases. We write these out in
+            # the same order as they were added, so that if we crash while
+            # doing this, we won't lose any non-cancelled leases.
+            leases = [l for l in leases if l] # remove the cancelled leases
+            f = open(self.home, 'rb+')
+            for i,lease in enumerate(leases):
+                self._write_lease_record(f, i, lease)
+            self._write_num_leases(f, len(leases))
+            self._truncate_leases(f, len(leases))
+            f.close()
+        space_freed = self.LEASE_SIZE * num_leases_removed
+        if not len(leases):
+            space_freed += os.stat(self.home)[stat.ST_SIZE]
+            self.unlink()
+        return space_freed
+
+
+class BucketWriter(Referenceable):
+    implements(RIBucketWriter)
+
+    def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary):
+        self.ss = ss
+        self.incominghome = incominghome
+        self.finalhome = finalhome
+        self._max_size = max_size # don't allow the client to write more than this
+        self._canary = canary
+        self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
+        self.closed = False
+        self.throw_out_all_data = False
+        self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
+        # also, add our lease to the file now, so that other ones can be
+        # added by simultaneous uploaders
+        self._sharefile.add_lease(lease_info)
+
+    def allocated_size(self):
+        return self._max_size
+
+    def remote_write(self, offset, data):
+        start = time.time()
+        precondition(not self.closed)
+        if self.throw_out_all_data:
+            return
+        self._sharefile.write_share_data(offset, data)
+        self.ss.add_latency("write", time.time() - start)
+        self.ss.count("write")
+
+    def remote_close(self):
+        precondition(not self.closed)
+        start = time.time()
+
+        fileutil.make_dirs(os.path.dirname(self.finalhome))
+        fileutil.rename(self.incominghome, self.finalhome)
+        try:
+            # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
+            # We try to delete the parent (.../ab/abcde) to avoid leaving
+            # these directories lying around forever, but the delete might
+            # fail if we're working on another share for the same storage
+            # index (like ab/abcde/5). The alternative approach would be to
+            # use a hierarchy of objects (PrefixHolder, BucketHolder,
+            # ShareWriter), each of which is responsible for a single
+            # directory on disk, and have them use reference counting of
+            # their children to know when they should do the rmdir. This
+            # approach is simpler, but relies on os.rmdir refusing to delete
+            # a non-empty directory. Do *not* use fileutil.rm_dir() here!
+            os.rmdir(os.path.dirname(self.incominghome))
+            # we also delete the grandparent (prefix) directory, .../ab ,
+            # again to avoid leaving directories lying around. This might
+            # fail if there is another bucket open that shares a prefix (like
+            # ab/abfff).
+            os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
+            # we leave the great-grandparent (incoming/) directory in place.
+        except EnvironmentError:
+            # ignore the "can't rmdir because the directory is not empty"
+            # exceptions, those are normal consequences of the
+            # above-mentioned conditions.
+            pass
+        self._sharefile = None
+        self.closed = True
+        self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
+
+        filelen = os.stat(self.finalhome)[stat.ST_SIZE]
+        self.ss.bucket_writer_closed(self, filelen)
+        self.ss.add_latency("close", time.time() - start)
+        self.ss.count("close")
+
+    def _disconnected(self):
+        if not self.closed:
+            self._abort()
+
+    def remote_abort(self):
+        log.msg("storage: aborting sharefile %s" % self.incominghome,
+                facility="tahoe.storage", level=log.UNUSUAL)
+        if not self.closed:
+            self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
+        self._abort()
+        self.ss.count("abort")
+
+    def _abort(self):
+        if self.closed:
+            return
+        os.remove(self.incominghome)
+        # if we were the last share to be moved, remove the incoming/
+        # directory that was our parent
+        parentdir = os.path.split(self.incominghome)[0]
+        if not os.listdir(parentdir):
+            os.rmdir(parentdir)
+
+
+
+class BucketReader(Referenceable):
+    implements(RIBucketReader)
+
+    def __init__(self, ss, sharefname, storage_index=None, shnum=None):
+        self.ss = ss
+        self._share_file = ShareFile(sharefname)
+        self.storage_index = storage_index
+        self.shnum = shnum
+
+    def __repr__(self):
+        return "<%s %s %s>" % (self.__class__.__name__, base32.b2a_l(self.storage_index[:8], 60), self.shnum)
+
+    def remote_read(self, offset, length):
+        start = time.time()
+        data = self._share_file.read_share_data(offset, length)
+        self.ss.add_latency("read", time.time() - start)
+        self.ss.count("read")
+        return data
+
+    def remote_advise_corrupt_share(self, reason):
+        return self.ss.remote_advise_corrupt_share("immutable",
+                                                   self.storage_index,
+                                                   self.shnum,
+                                                   reason)
diff --git a/src/allmydata/storage/lease.py b/src/allmydata/storage/lease.py
new file mode 100644 (file)
index 0000000..2b91d87
--- /dev/null
@@ -0,0 +1,41 @@
+
+import struct
+
+class LeaseInfo:
+    def __init__(self, owner_num=None, renew_secret=None, cancel_secret=None,
+                 expiration_time=None, nodeid=None):
+        self.owner_num = owner_num
+        self.renew_secret = renew_secret
+        self.cancel_secret = cancel_secret
+        self.expiration_time = expiration_time
+        if nodeid is not None:
+            assert isinstance(nodeid, str)
+            assert len(nodeid) == 20
+        self.nodeid = nodeid
+
+    def from_immutable_data(self, data):
+        (self.owner_num,
+         self.renew_secret,
+         self.cancel_secret,
+         self.expiration_time) = struct.unpack(">L32s32sL", data)
+        self.nodeid = None
+        return self
+    def to_immutable_data(self):
+        return struct.pack(">L32s32sL",
+                           self.owner_num,
+                           self.renew_secret, self.cancel_secret,
+                           int(self.expiration_time))
+
+    def to_mutable_data(self):
+        return struct.pack(">LL32s32s20s",
+                           self.owner_num,
+                           int(self.expiration_time),
+                           self.renew_secret, self.cancel_secret,
+                           self.nodeid)
+    def from_mutable_data(self, data):
+        (self.owner_num,
+         self.expiration_time,
+         self.renew_secret, self.cancel_secret,
+         self.nodeid) = struct.unpack(">LL32s32s20s", data)
+        return self
+
diff --git a/src/allmydata/storage/mutable.py b/src/allmydata/storage/mutable.py
new file mode 100644 (file)
index 0000000..de78a65
--- /dev/null
@@ -0,0 +1,429 @@
+import os, stat, struct
+
+from allmydata.interfaces import BadWriteEnablerError
+from allmydata.util import idlib, log
+from allmydata.util.assertutil import precondition
+from allmydata.storage.lease import LeaseInfo
+from allmydata.storage.common import DataTooLargeError
+
+# the MutableShareFile is like the ShareFile, but used for mutable data. It
+# has a different layout. See docs/mutable.txt for more details.
+
+# #   offset    size    name
+# 1   0         32      magic verstr "tahoe mutable container v1" plus binary
+# 2   32        20      write enabler's nodeid
+# 3   52        32      write enabler
+# 4   84        8       data size (actual share data present) (a)
+# 5   92        8       offset of (8) count of extra leases (after data)
+# 6   100       368     four leases, 92 bytes each
+#                        0    4   ownerid (0 means "no lease here")
+#                        4    4   expiration timestamp
+#                        8   32   renewal token
+#                        40  32   cancel token
+#                        72  20   nodeid which accepted the tokens
+# 7   468       (a)     data
+# 8   ??        4       count of extra leases
+# 9   ??        n*92    extra leases
+
+
+assert struct.calcsize("L"), 4 # The struct module doc says that L's are 4 bytes in size.
+assert struct.calcsize("Q"), 8 # The struct module doc says that Q's are 8 bytes in size (at least with big-endian ordering).
+
+class MutableShareFile:
+
+    DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
+    EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
+    HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
+    LEASE_SIZE = struct.calcsize(">LL32s32s20s")
+    assert LEASE_SIZE == 92
+    DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
+    assert DATA_OFFSET == 468, DATA_OFFSET
+    # our sharefiles share with a recognizable string, plus some random
+    # binary data to reduce the chance that a regular text file will look
+    # like a sharefile.
+    MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
+    assert len(MAGIC) == 32
+    MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
+    # TODO: decide upon a policy for max share size
+
+    def __init__(self, filename, parent=None):
+        self.home = filename
+        if os.path.exists(self.home):
+            # we don't cache anything, just check the magic
+            f = open(self.home, 'rb')
+            data = f.read(self.HEADER_SIZE)
+            (magic,
+             write_enabler_nodeid, write_enabler,
+             data_length, extra_least_offset) = \
+             struct.unpack(">32s20s32sQQ", data)
+            assert magic == self.MAGIC
+        self.parent = parent # for logging
+
+    def log(self, *args, **kwargs):
+        return self.parent.log(*args, **kwargs)
+
+    def create(self, my_nodeid, write_enabler):
+        assert not os.path.exists(self.home)
+        data_length = 0
+        extra_lease_offset = (self.HEADER_SIZE
+                              + 4 * self.LEASE_SIZE
+                              + data_length)
+        assert extra_lease_offset == self.DATA_OFFSET # true at creation
+        num_extra_leases = 0
+        f = open(self.home, 'wb')
+        header = struct.pack(">32s20s32sQQ",
+                             self.MAGIC, my_nodeid, write_enabler,
+                             data_length, extra_lease_offset,
+                             )
+        leases = ("\x00"*self.LEASE_SIZE) * 4
+        f.write(header + leases)
+        # data goes here, empty after creation
+        f.write(struct.pack(">L", num_extra_leases))
+        # extra leases go here, none at creation
+        f.close()
+
+    def unlink(self):
+        os.unlink(self.home)
+
+    def _read_data_length(self, f):
+        f.seek(self.DATA_LENGTH_OFFSET)
+        (data_length,) = struct.unpack(">Q", f.read(8))
+        return data_length
+
+    def _write_data_length(self, f, data_length):
+        f.seek(self.DATA_LENGTH_OFFSET)
+        f.write(struct.pack(">Q", data_length))
+
+    def _read_share_data(self, f, offset, length):
+        precondition(offset >= 0)
+        data_length = self._read_data_length(f)
+        if offset+length > data_length:
+            # reads beyond the end of the data are truncated. Reads that
+            # start beyond the end of the data return an empty string.
+            length = max(0, data_length-offset)
+        if length == 0:
+            return ""
+        precondition(offset+length <= data_length)
+        f.seek(self.DATA_OFFSET+offset)
+        data = f.read(length)
+        return data
+
+    def _read_extra_lease_offset(self, f):
+        f.seek(self.EXTRA_LEASE_OFFSET)
+        (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
+        return extra_lease_offset
+
+    def _write_extra_lease_offset(self, f, offset):
+        f.seek(self.EXTRA_LEASE_OFFSET)
+        f.write(struct.pack(">Q", offset))
+
+    def _read_num_extra_leases(self, f):
+        offset = self._read_extra_lease_offset(f)
+        f.seek(offset)
+        (num_extra_leases,) = struct.unpack(">L", f.read(4))
+        return num_extra_leases
+
+    def _write_num_extra_leases(self, f, num_leases):
+        extra_lease_offset = self._read_extra_lease_offset(f)
+        f.seek(extra_lease_offset)
+        f.write(struct.pack(">L", num_leases))
+
+    def _change_container_size(self, f, new_container_size):
+        if new_container_size > self.MAX_SIZE:
+            raise DataTooLargeError()
+        old_extra_lease_offset = self._read_extra_lease_offset(f)
+        new_extra_lease_offset = self.DATA_OFFSET + new_container_size
+        if new_extra_lease_offset < old_extra_lease_offset:
+            # TODO: allow containers to shrink. For now they remain large.
+            return
+        num_extra_leases = self._read_num_extra_leases(f)
+        f.seek(old_extra_lease_offset)
+        extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
+        f.seek(new_extra_lease_offset)
+        f.write(extra_lease_data)
+        # an interrupt here will corrupt the leases, iff the move caused the
+        # extra leases to overlap.
+        self._write_extra_lease_offset(f, new_extra_lease_offset)
+
+    def _write_share_data(self, f, offset, data):
+        length = len(data)
+        precondition(offset >= 0)
+        data_length = self._read_data_length(f)
+        extra_lease_offset = self._read_extra_lease_offset(f)
+
+        if offset+length >= data_length:
+            # They are expanding their data size.
+            if self.DATA_OFFSET+offset+length > extra_lease_offset:
+                # Their new data won't fit in the current container, so we
+                # have to move the leases. With luck, they're expanding it
+                # more than the size of the extra lease block, which will
+                # minimize the corrupt-the-share window
+                self._change_container_size(f, offset+length)
+                extra_lease_offset = self._read_extra_lease_offset(f)
+
+                # an interrupt here is ok.. the container has been enlarged
+                # but the data remains untouched
+
+            assert self.DATA_OFFSET+offset+length <= extra_lease_offset
+            # Their data now fits in the current container. We must write
+            # their new data and modify the recorded data size.
+            new_data_length = offset+length
+            self._write_data_length(f, new_data_length)
+            # an interrupt here will result in a corrupted share
+
+        # now all that's left to do is write out their data
+        f.seek(self.DATA_OFFSET+offset)
+        f.write(data)
+        return
+
+    def _write_lease_record(self, f, lease_number, lease_info):
+        extra_lease_offset = self._read_extra_lease_offset(f)
+        num_extra_leases = self._read_num_extra_leases(f)
+        if lease_number < 4:
+            offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
+        elif (lease_number-4) < num_extra_leases:
+            offset = (extra_lease_offset
+                      + 4
+                      + (lease_number-4)*self.LEASE_SIZE)
+        else:
+            # must add an extra lease record
+            self._write_num_extra_leases(f, num_extra_leases+1)
+            offset = (extra_lease_offset
+                      + 4
+                      + (lease_number-4)*self.LEASE_SIZE)
+        f.seek(offset)
+        assert f.tell() == offset
+        f.write(lease_info.to_mutable_data())
+
+    def _read_lease_record(self, f, lease_number):
+        # returns a LeaseInfo instance, or None
+        extra_lease_offset = self._read_extra_lease_offset(f)
+        num_extra_leases = self._read_num_extra_leases(f)
+        if lease_number < 4:
+            offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
+        elif (lease_number-4) < num_extra_leases:
+            offset = (extra_lease_offset
+                      + 4
+                      + (lease_number-4)*self.LEASE_SIZE)
+        else:
+            raise IndexError("No such lease number %d" % lease_number)
+        f.seek(offset)
+        assert f.tell() == offset
+        data = f.read(self.LEASE_SIZE)
+        lease_info = LeaseInfo().from_mutable_data(data)
+        if lease_info.owner_num == 0:
+            return None
+        return lease_info
+
+    def _get_num_lease_slots(self, f):
+        # how many places do we have allocated for leases? Not all of them
+        # are filled.
+        num_extra_leases = self._read_num_extra_leases(f)
+        return 4+num_extra_leases
+
+    def _get_first_empty_lease_slot(self, f):
+        # return an int with the index of an empty slot, or None if we do not
+        # currently have an empty slot
+
+        for i in range(self._get_num_lease_slots(f)):
+            if self._read_lease_record(f, i) is None:
+                return i
+        return None
+
+    def _enumerate_leases(self, f):
+        """Yields (leasenum, (ownerid, expiration_time, renew_secret,
+        cancel_secret, accepting_nodeid)) for all leases."""
+        for i in range(self._get_num_lease_slots(f)):
+            try:
+                data = self._read_lease_record(f, i)
+                if data is not None:
+                    yield (i,data)
+            except IndexError:
+                return
+
+    def debug_get_leases(self):
+        f = open(self.home, 'rb')
+        leases = list(self._enumerate_leases(f))
+        f.close()
+        return leases
+
+    def add_lease(self, lease_info):
+        precondition(lease_info.owner_num != 0) # 0 means "no lease here"
+        f = open(self.home, 'rb+')
+        num_lease_slots = self._get_num_lease_slots(f)
+        empty_slot = self._get_first_empty_lease_slot(f)
+        if empty_slot is not None:
+            self._write_lease_record(f, empty_slot, lease_info)
+        else:
+            self._write_lease_record(f, num_lease_slots, lease_info)
+        f.close()
+
+    def renew_lease(self, renew_secret, new_expire_time):
+        accepting_nodeids = set()
+        f = open(self.home, 'rb+')
+        for (leasenum,lease) in self._enumerate_leases(f):
+            if lease.renew_secret == renew_secret:
+                # yup. See if we need to update the owner time.
+                if new_expire_time > lease.expiration_time:
+                    # yes
+                    lease.expiration_time = new_expire_time
+                    self._write_lease_record(f, leasenum, lease)
+                f.close()
+                return
+            accepting_nodeids.add(lease.nodeid)
+        f.close()
+        # Return the accepting_nodeids set, to give the client a chance to
+        # update the leases on a share which has been migrated from its
+        # original server to a new one.
+        msg = ("Unable to renew non-existent lease. I have leases accepted by"
+               " nodeids: ")
+        msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
+                         for anid in accepting_nodeids])
+        msg += " ."
+        raise IndexError(msg)
+
+    def add_or_renew_lease(self, lease_info):
+        precondition(lease_info.owner_num != 0) # 0 means "no lease here"
+        try:
+            self.renew_lease(lease_info.renew_secret,
+                             lease_info.expiration_time)
+        except IndexError:
+            self.add_lease(lease_info)
+
+    def cancel_lease(self, cancel_secret):
+        """Remove any leases with the given cancel_secret. If the last lease
+        is cancelled, the file will be removed. Return the number of bytes
+        that were freed (by truncating the list of leases, and possibly by
+        deleting the file. Raise IndexError if there was no lease with the
+        given cancel_secret."""
+
+        accepting_nodeids = set()
+        modified = 0
+        remaining = 0
+        blank_lease = LeaseInfo(owner_num=0,
+                                renew_secret="\x00"*32,
+                                cancel_secret="\x00"*32,
+                                expiration_time=0,
+                                nodeid="\x00"*20)
+        f = open(self.home, 'rb+')
+        for (leasenum,lease) in self._enumerate_leases(f):
+            accepting_nodeids.add(lease.nodeid)
+            if lease.cancel_secret == cancel_secret:
+                self._write_lease_record(f, leasenum, blank_lease)
+                modified += 1
+            else:
+                remaining += 1
+        if modified:
+            freed_space = self._pack_leases(f)
+            f.close()
+            if not remaining:
+                freed_space += os.stat(self.home)[stat.ST_SIZE]
+                self.unlink()
+            return freed_space
+
+        msg = ("Unable to cancel non-existent lease. I have leases "
+               "accepted by nodeids: ")
+        msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
+                         for anid in accepting_nodeids])
+        msg += " ."
+        raise IndexError(msg)
+
+    def _pack_leases(self, f):
+        # TODO: reclaim space from cancelled leases
+        return 0
+
+    def _read_write_enabler_and_nodeid(self, f):
+        f.seek(0)
+        data = f.read(self.HEADER_SIZE)
+        (magic,
+         write_enabler_nodeid, write_enabler,
+         data_length, extra_least_offset) = \
+         struct.unpack(">32s20s32sQQ", data)
+        assert magic == self.MAGIC
+        return (write_enabler, write_enabler_nodeid)
+
+    def readv(self, readv):
+        datav = []
+        f = open(self.home, 'rb')
+        for (offset, length) in readv:
+            datav.append(self._read_share_data(f, offset, length))
+        f.close()
+        return datav
+
+#    def remote_get_length(self):
+#        f = open(self.home, 'rb')
+#        data_length = self._read_data_length(f)
+#        f.close()
+#        return data_length
+
+    def check_write_enabler(self, write_enabler, si_s):
+        f = open(self.home, 'rb+')
+        (real_write_enabler, write_enabler_nodeid) = \
+                             self._read_write_enabler_and_nodeid(f)
+        f.close()
+        if write_enabler != real_write_enabler:
+            # accomodate share migration by reporting the nodeid used for the
+            # old write enabler.
+            self.log(format="bad write enabler on SI %(si)s,"
+                     " recorded by nodeid %(nodeid)s",
+                     facility="tahoe.storage",
+                     level=log.WEIRD, umid="cE1eBQ",
+                     si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
+            msg = "The write enabler was recorded by nodeid '%s'." % \
+                  (idlib.nodeid_b2a(write_enabler_nodeid),)
+            raise BadWriteEnablerError(msg)
+
+    def check_testv(self, testv):
+        test_good = True
+        f = open(self.home, 'rb+')
+        for (offset, length, operator, specimen) in testv:
+            data = self._read_share_data(f, offset, length)
+            if not testv_compare(data, operator, specimen):
+                test_good = False
+                break
+        f.close()
+        return test_good
+
+    def writev(self, datav, new_length):
+        f = open(self.home, 'rb+')
+        for (offset, data) in datav:
+            self._write_share_data(f, offset, data)
+        if new_length is not None:
+            self._change_container_size(f, new_length)
+            f.seek(self.DATA_LENGTH_OFFSET)
+            f.write(struct.pack(">Q", new_length))
+        f.close()
+
+def testv_compare(a, op, b):
+    assert op in ("lt", "le", "eq", "ne", "ge", "gt")
+    if op == "lt":
+        return a < b
+    if op == "le":
+        return a <= b
+    if op == "eq":
+        return a == b
+    if op == "ne":
+        return a != b
+    if op == "ge":
+        return a >= b
+    if op == "gt":
+        return a > b
+    # never reached
+
+class EmptyShare:
+
+    def check_testv(self, testv):
+        test_good = True
+        for (offset, length, operator, specimen) in testv:
+            data = ""
+            if not testv_compare(data, operator, specimen):
+                test_good = False
+                break
+        return test_good
+
+def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
+    ms = MutableShareFile(filename, parent)
+    ms.create(my_nodeid, write_enabler)
+    del ms
+    return MutableShareFile(filename, parent)
+
diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py
new file mode 100644 (file)
index 0000000..fdc2a1a
--- /dev/null
@@ -0,0 +1,556 @@
+import os, re, weakref, struct, time
+
+from foolscap import Referenceable
+from twisted.application import service
+
+from zope.interface import implements
+from allmydata.interfaces import RIStorageServer, IStatsProducer
+from allmydata.util import base32, fileutil, log, time_format
+import allmydata # for __full_version__
+
+from allmydata.storage.lease import LeaseInfo
+from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
+     create_mutable_sharefile
+from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
+
+# storage/
+# storage/shares/incoming
+#   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
+#   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
+# storage/shares/$START/$STORAGEINDEX
+# storage/shares/$START/$STORAGEINDEX/$SHARENUM
+
+# Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
+# base-32 chars).
+
+# $SHARENUM matches this regex:
+NUM_RE=re.compile("^[0-9]+$")
+
+def si_b2a(storageindex):
+    return base32.b2a(storageindex)
+
+def si_a2b(ascii_storageindex):
+    return base32.a2b(ascii_storageindex)
+
+def storage_index_to_dir(storageindex):
+    sia = si_b2a(storageindex)
+    return os.path.join(sia[:2], sia)
+
+
+
+class StorageServer(service.MultiService, Referenceable):
+    implements(RIStorageServer, IStatsProducer)
+    name = 'storage'
+
+    def __init__(self, storedir, reserved_space=0,
+                 discard_storage=False, readonly_storage=False,
+                 stats_provider=None):
+        service.MultiService.__init__(self)
+        self.storedir = storedir
+        sharedir = os.path.join(storedir, "shares")
+        fileutil.make_dirs(sharedir)
+        self.sharedir = sharedir
+        # we don't actually create the corruption-advisory dir until necessary
+        self.corruption_advisory_dir = os.path.join(storedir,
+                                                    "corruption-advisories")
+        self.reserved_space = int(reserved_space)
+        self.no_storage = discard_storage
+        self.readonly_storage = readonly_storage
+        self.stats_provider = stats_provider
+        if self.stats_provider:
+            self.stats_provider.register_producer(self)
+        self.incomingdir = os.path.join(sharedir, 'incoming')
+        self._clean_incomplete()
+        fileutil.make_dirs(self.incomingdir)
+        self._active_writers = weakref.WeakKeyDictionary()
+        lp = log.msg("StorageServer created", facility="tahoe.storage")
+
+        if reserved_space:
+            if self.get_available_space() is None:
+                log.msg("warning: [storage]reserved_space= is set, but this platform does not support statvfs(2), so this reservation cannot be honored",
+                        umin="0wZ27w", level=log.UNUSUAL)
+
+        self.latencies = {"allocate": [], # immutable
+                          "write": [],
+                          "close": [],
+                          "read": [],
+                          "get": [],
+                          "writev": [], # mutable
+                          "readv": [],
+                          "add-lease": [], # both
+                          "renew": [],
+                          "cancel": [],
+                          }
+
+    def count(self, name, delta=1):
+        if self.stats_provider:
+            self.stats_provider.count("storage_server." + name, delta)
+
+    def add_latency(self, category, latency):
+        a = self.latencies[category]
+        a.append(latency)
+        if len(a) > 1000:
+            self.latencies[category] = a[-1000:]
+
+    def get_latencies(self):
+        """Return a dict, indexed by category, that contains a dict of
+        latency numbers for each category. Each dict will contain the
+        following keys: mean, 01_0_percentile, 10_0_percentile,
+        50_0_percentile (median), 90_0_percentile, 95_0_percentile,
+        99_0_percentile, 99_9_percentile. If no samples have been collected
+        for the given category, then that category name will not be present
+        in the return value."""
+        # note that Amazon's Dynamo paper says they use 99.9% percentile.
+        output = {}
+        for category in self.latencies:
+            if not self.latencies[category]:
+                continue
+            stats = {}
+            samples = self.latencies[category][:]
+            samples.sort()
+            count = len(samples)
+            stats["mean"] = sum(samples) / count
+            stats["01_0_percentile"] = samples[int(0.01 * count)]
+            stats["10_0_percentile"] = samples[int(0.1 * count)]
+            stats["50_0_percentile"] = samples[int(0.5 * count)]
+            stats["90_0_percentile"] = samples[int(0.9 * count)]
+            stats["95_0_percentile"] = samples[int(0.95 * count)]
+            stats["99_0_percentile"] = samples[int(0.99 * count)]
+            stats["99_9_percentile"] = samples[int(0.999 * count)]
+            output[category] = stats
+        return output
+
+    def log(self, *args, **kwargs):
+        if "facility" not in kwargs:
+            kwargs["facility"] = "tahoe.storage"
+        return log.msg(*args, **kwargs)
+
+    def setNodeID(self, nodeid):
+        # somebody must set this before any slots can be created or leases
+        # added
+        self.my_nodeid = nodeid
+
+    def startService(self):
+        service.MultiService.startService(self)
+        if self.parent:
+            nodeid = self.parent.nodeid # 20 bytes, binary
+            assert len(nodeid) == 20
+            self.setNodeID(nodeid)
+
+    def _clean_incomplete(self):
+        fileutil.rm_dir(self.incomingdir)
+
+    def get_stats(self):
+        # remember: RIStatsProvider requires that our return dict
+        # contains numeric values.
+        stats = { 'storage_server.allocated': self.allocated_size(), }
+        for category,ld in self.get_latencies().items():
+            for name,v in ld.items():
+                stats['storage_server.latencies.%s.%s' % (category, name)] = v
+        writeable = True
+        if self.readonly_storage:
+            writeable = False
+        try:
+            s = os.statvfs(self.storedir)
+            disk_total = s.f_bsize * s.f_blocks
+            disk_used = s.f_bsize * (s.f_blocks - s.f_bfree)
+            # spacetime predictors should look at the slope of disk_used.
+            disk_avail = s.f_bsize * s.f_bavail # available to non-root users
+            # include our local policy here: if we stop accepting shares when
+            # the available space drops below 1GB, then include that fact in
+            # disk_avail.
+            disk_avail -= self.reserved_space
+            disk_avail = max(disk_avail, 0)
+            if self.readonly_storage:
+                disk_avail = 0
+            if disk_avail == 0:
+                writeable = False
+
+            # spacetime predictors should use disk_avail / (d(disk_used)/dt)
+            stats["storage_server.disk_total"] = disk_total
+            stats["storage_server.disk_used"] = disk_used
+            stats["storage_server.disk_avail"] = disk_avail
+        except AttributeError:
+            # os.statvfs is available only on unix
+            pass
+        stats["storage_server.accepting_immutable_shares"] = int(writeable)
+        return stats
+
+
+    def stat_disk(self, d):
+        s = os.statvfs(d)
+        # s.f_bavail: available to non-root users
+        disk_avail = s.f_bsize * s.f_bavail
+        return disk_avail
+
+    def get_available_space(self):
+        # returns None if it cannot be measured (windows)
+        try:
+            disk_avail = self.stat_disk(self.storedir)
+            disk_avail -= self.reserved_space
+        except AttributeError:
+            disk_avail = None
+        if self.readonly_storage:
+            disk_avail = 0
+        return disk_avail
+
+    def allocated_size(self):
+        space = 0
+        for bw in self._active_writers:
+            space += bw.allocated_size()
+        return space
+
+    def remote_get_version(self):
+        remaining_space = self.get_available_space()
+        if remaining_space is None:
+            # we're on a platform that doesn't have 'df', so make a vague
+            # guess.
+            remaining_space = 2**64
+        version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
+                    { "maximum-immutable-share-size": remaining_space,
+                      "tolerates-immutable-read-overrun": True,
+                      "delete-mutable-shares-with-zero-length-writev": True,
+                      },
+                    "application-version": str(allmydata.__full_version__),
+                    }
+        return version
+
+    def remote_allocate_buckets(self, storage_index,
+                                renew_secret, cancel_secret,
+                                sharenums, allocated_size,
+                                canary, owner_num=0):
+        # owner_num is not for clients to set, but rather it should be
+        # curried into the PersonalStorageServer instance that is dedicated
+        # to a particular owner.
+        start = time.time()
+        self.count("allocate")
+        alreadygot = set()
+        bucketwriters = {} # k: shnum, v: BucketWriter
+        si_dir = storage_index_to_dir(storage_index)
+        si_s = si_b2a(storage_index)
+
+        log.msg("storage: allocate_buckets %s" % si_s)
+
+        # in this implementation, the lease information (including secrets)
+        # goes into the share files themselves. It could also be put into a
+        # separate database. Note that the lease should not be added until
+        # the BucketWriter has been closed.
+        expire_time = time.time() + 31*24*60*60
+        lease_info = LeaseInfo(owner_num,
+                               renew_secret, cancel_secret,
+                               expire_time, self.my_nodeid)
+
+        max_space_per_bucket = allocated_size
+
+        remaining_space = self.get_available_space()
+        limited = remaining_space is not None
+        if limited:
+            # this is a bit conservative, since some of this allocated_size()
+            # has already been written to disk, where it will show up in
+            # get_available_space.
+            remaining_space -= self.allocated_size()
+
+        # fill alreadygot with all shares that we have, not just the ones
+        # they asked about: this will save them a lot of work. Add or update
+        # leases for all of them: if they want us to hold shares for this
+        # file, they'll want us to hold leases for this file.
+        for (shnum, fn) in self._get_bucket_shares(storage_index):
+            alreadygot.add(shnum)
+            sf = ShareFile(fn)
+            sf.add_or_renew_lease(lease_info)
+
+        # self.readonly_storage causes remaining_space=0
+
+        for shnum in sharenums:
+            incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
+            finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
+            if os.path.exists(finalhome):
+                # great! we already have it. easy.
+                pass
+            elif os.path.exists(incominghome):
+                # Note that we don't create BucketWriters for shnums that
+                # have a partial share (in incoming/), so if a second upload
+                # occurs while the first is still in progress, the second
+                # uploader will use different storage servers.
+                pass
+            elif (not limited) or (remaining_space >= max_space_per_bucket):
+                # ok! we need to create the new share file.
+                bw = BucketWriter(self, incominghome, finalhome,
+                                  max_space_per_bucket, lease_info, canary)
+                if self.no_storage:
+                    bw.throw_out_all_data = True
+                bucketwriters[shnum] = bw
+                self._active_writers[bw] = 1
+                if limited:
+                    remaining_space -= max_space_per_bucket
+            else:
+                # bummer! not enough space to accept this bucket
+                pass
+
+        if bucketwriters:
+            fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
+
+        self.add_latency("allocate", time.time() - start)
+        return alreadygot, bucketwriters
+
+    def _iter_share_files(self, storage_index):
+        for shnum, filename in self._get_bucket_shares(storage_index):
+            f = open(filename, 'rb')
+            header = f.read(32)
+            f.close()
+            if header[:32] == MutableShareFile.MAGIC:
+                sf = MutableShareFile(filename, self)
+                # note: if the share has been migrated, the renew_lease()
+                # call will throw an exception, with information to help the
+                # client update the lease.
+            elif header[:4] == struct.pack(">L", 1):
+                sf = ShareFile(filename)
+            else:
+                continue # non-sharefile
+            yield sf
+
+    def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
+                         owner_num=1):
+        start = time.time()
+        self.count("add-lease")
+        new_expire_time = time.time() + 31*24*60*60
+        lease_info = LeaseInfo(owner_num,
+                               renew_secret, cancel_secret,
+                               new_expire_time, self.my_nodeid)
+        for sf in self._iter_share_files(storage_index):
+            sf.add_or_renew_lease(lease_info)
+        self.add_latency("add-lease", time.time() - start)
+        return None
+
+    def remote_renew_lease(self, storage_index, renew_secret):
+        start = time.time()
+        self.count("renew")
+        new_expire_time = time.time() + 31*24*60*60
+        found_buckets = False
+        for sf in self._iter_share_files(storage_index):
+            found_buckets = True
+            sf.renew_lease(renew_secret, new_expire_time)
+        self.add_latency("renew", time.time() - start)
+        if not found_buckets:
+            raise IndexError("no such lease to renew")
+
+    def remote_cancel_lease(self, storage_index, cancel_secret):
+        start = time.time()
+        self.count("cancel")
+
+        total_space_freed = 0
+        found_buckets = False
+        for sf in self._iter_share_files(storage_index):
+            # note: if we can't find a lease on one share, we won't bother
+            # looking in the others. Unless something broke internally
+            # (perhaps we ran out of disk space while adding a lease), the
+            # leases on all shares will be identical.
+            found_buckets = True
+            # this raises IndexError if the lease wasn't present XXXX
+            total_space_freed += sf.cancel_lease(cancel_secret)
+
+        if found_buckets:
+            storagedir = os.path.join(self.sharedir,
+                                      storage_index_to_dir(storage_index))
+            if not os.listdir(storagedir):
+                os.rmdir(storagedir)
+
+        if self.stats_provider:
+            self.stats_provider.count('storage_server.bytes_freed',
+                                      total_space_freed)
+        self.add_latency("cancel", time.time() - start)
+        if not found_buckets:
+            raise IndexError("no such storage index")
+
+    def bucket_writer_closed(self, bw, consumed_size):
+        if self.stats_provider:
+            self.stats_provider.count('storage_server.bytes_added', consumed_size)
+        del self._active_writers[bw]
+
+    def _get_bucket_shares(self, storage_index):
+        """Return a list of (shnum, pathname) tuples for files that hold
+        shares for this storage_index. In each tuple, 'shnum' will always be
+        the integer form of the last component of 'pathname'."""
+        storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
+        try:
+            for f in os.listdir(storagedir):
+                if NUM_RE.match(f):
+                    filename = os.path.join(storagedir, f)
+                    yield (int(f), filename)
+        except OSError:
+            # Commonly caused by there being no buckets at all.
+            pass
+
+    def remote_get_buckets(self, storage_index):
+        start = time.time()
+        self.count("get")
+        si_s = si_b2a(storage_index)
+        log.msg("storage: get_buckets %s" % si_s)
+        bucketreaders = {} # k: sharenum, v: BucketReader
+        for shnum, filename in self._get_bucket_shares(storage_index):
+            bucketreaders[shnum] = BucketReader(self, filename,
+                                                storage_index, shnum)
+        self.add_latency("get", time.time() - start)
+        return bucketreaders
+
+    def get_leases(self, storage_index):
+        """Provide an iterator that yields all of the leases attached to this
+        bucket. Each lease is returned as a tuple of (owner_num,
+        renew_secret, cancel_secret, expiration_time).
+
+        This method is not for client use.
+        """
+
+        # since all shares get the same lease data, we just grab the leases
+        # from the first share
+        try:
+            shnum, filename = self._get_bucket_shares(storage_index).next()
+            sf = ShareFile(filename)
+            return sf.iter_leases()
+        except StopIteration:
+            return iter([])
+
+    def remote_slot_testv_and_readv_and_writev(self, storage_index,
+                                               secrets,
+                                               test_and_write_vectors,
+                                               read_vector):
+        start = time.time()
+        self.count("writev")
+        si_s = si_b2a(storage_index)
+        lp = log.msg("storage: slot_writev %s" % si_s)
+        si_dir = storage_index_to_dir(storage_index)
+        (write_enabler, renew_secret, cancel_secret) = secrets
+        # shares exist if there is a file for them
+        bucketdir = os.path.join(self.sharedir, si_dir)
+        shares = {}
+        if os.path.isdir(bucketdir):
+            for sharenum_s in os.listdir(bucketdir):
+                try:
+                    sharenum = int(sharenum_s)
+                except ValueError:
+                    continue
+                filename = os.path.join(bucketdir, sharenum_s)
+                msf = MutableShareFile(filename, self)
+                msf.check_write_enabler(write_enabler, si_s)
+                shares[sharenum] = msf
+        # write_enabler is good for all existing shares.
+
+        # Now evaluate test vectors.
+        testv_is_good = True
+        for sharenum in test_and_write_vectors:
+            (testv, datav, new_length) = test_and_write_vectors[sharenum]
+            if sharenum in shares:
+                if not shares[sharenum].check_testv(testv):
+                    self.log("testv failed: [%d]: %r" % (sharenum, testv))
+                    testv_is_good = False
+                    break
+            else:
+                # compare the vectors against an empty share, in which all
+                # reads return empty strings.
+                if not EmptyShare().check_testv(testv):
+                    self.log("testv failed (empty): [%d] %r" % (sharenum,
+                                                                testv))
+                    testv_is_good = False
+                    break
+
+        # now gather the read vectors, before we do any writes
+        read_data = {}
+        for sharenum, share in shares.items():
+            read_data[sharenum] = share.readv(read_vector)
+
+        ownerid = 1 # TODO
+        expire_time = time.time() + 31*24*60*60   # one month
+        lease_info = LeaseInfo(ownerid,
+                               renew_secret, cancel_secret,
+                               expire_time, self.my_nodeid)
+
+        if testv_is_good:
+            # now apply the write vectors
+            for sharenum in test_and_write_vectors:
+                (testv, datav, new_length) = test_and_write_vectors[sharenum]
+                if new_length == 0:
+                    if sharenum in shares:
+                        shares[sharenum].unlink()
+                else:
+                    if sharenum not in shares:
+                        # allocate a new share
+                        allocated_size = 2000 # arbitrary, really
+                        share = self._allocate_slot_share(bucketdir, secrets,
+                                                          sharenum,
+                                                          allocated_size,
+                                                          owner_num=0)
+                        shares[sharenum] = share
+                    shares[sharenum].writev(datav, new_length)
+                    # and update the lease
+                    shares[sharenum].add_or_renew_lease(lease_info)
+
+            if new_length == 0:
+                # delete empty bucket directories
+                if not os.listdir(bucketdir):
+                    os.rmdir(bucketdir)
+
+
+        # all done
+        self.add_latency("writev", time.time() - start)
+        return (testv_is_good, read_data)
+
+    def _allocate_slot_share(self, bucketdir, secrets, sharenum,
+                             allocated_size, owner_num=0):
+        (write_enabler, renew_secret, cancel_secret) = secrets
+        my_nodeid = self.my_nodeid
+        fileutil.make_dirs(bucketdir)
+        filename = os.path.join(bucketdir, "%d" % sharenum)
+        share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
+                                         self)
+        return share
+
+    def remote_slot_readv(self, storage_index, shares, readv):
+        start = time.time()
+        self.count("readv")
+        si_s = si_b2a(storage_index)
+        lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
+                     facility="tahoe.storage", level=log.OPERATIONAL)
+        si_dir = storage_index_to_dir(storage_index)
+        # shares exist if there is a file for them
+        bucketdir = os.path.join(self.sharedir, si_dir)
+        if not os.path.isdir(bucketdir):
+            self.add_latency("readv", time.time() - start)
+            return {}
+        datavs = {}
+        for sharenum_s in os.listdir(bucketdir):
+            try:
+                sharenum = int(sharenum_s)
+            except ValueError:
+                continue
+            if sharenum in shares or not shares:
+                filename = os.path.join(bucketdir, sharenum_s)
+                msf = MutableShareFile(filename, self)
+                datavs[sharenum] = msf.readv(readv)
+        log.msg("returning shares %s" % (datavs.keys(),),
+                facility="tahoe.storage", level=log.NOISY, parent=lp)
+        self.add_latency("readv", time.time() - start)
+        return datavs
+
+    def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
+                                    reason):
+        fileutil.make_dirs(self.corruption_advisory_dir)
+        now = time_format.iso_utc(sep="T")
+        si_s = base32.b2a(storage_index)
+        # windows can't handle colons in the filename
+        fn = os.path.join(self.corruption_advisory_dir,
+                          "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
+        f = open(fn, "w")
+        f.write("report: Share Corruption\n")
+        f.write("type: %s\n" % share_type)
+        f.write("storage_index: %s\n" % si_s)
+        f.write("share_number: %d\n" % shnum)
+        f.write("\n")
+        f.write(reason)
+        f.write("\n")
+        f.close()
+        log.msg(format=("client claims corruption in (%(share_type)s) " +
+                        "%(si)s-%(shnum)d: %(reason)s"),
+                share_type=share_type, si=si_s, shnum=shnum, reason=reason,
+                level=log.SCARY, umid="SGx2fA")
+        return None
+
index f176cdb0d6847ec89aaf3148f3b721d1562cd6df..fd9cb1bb981efe8b788da1b99c746cce3a3e6e76 100644 (file)
@@ -13,7 +13,7 @@ from allmydata.interfaces import IURI, IMutableFileNode, IFileNode, \
 from allmydata.check_results import CheckResults, CheckAndRepairResults, \
      DeepCheckResults, DeepCheckAndRepairResults
 from allmydata.mutable.common import CorruptShareError
-from allmydata.storage import storage_index_to_dir
+from allmydata.storage.server import storage_index_to_dir
 from allmydata.util import hashutil, log, fileutil, pollmixin
 from allmydata.util.assertutil import precondition
 from allmydata.stats import StatsGathererService
index 9cc7c11580705b9c842399762fdfafdc55fdeb6e..1380720bc85e5b0c14f9364f7c2e8da5658bebd7 100644 (file)
@@ -21,7 +21,7 @@ from foolscap.eventual import fireEventually
 from base64 import b32encode
 from allmydata import uri as tahoe_uri
 from allmydata.client import Client
-from allmydata.storage import StorageServer, storage_index_to_dir
+from allmydata.storage.server import StorageServer, storage_index_to_dir
 from allmydata.util import fileutil, idlib, hashutil, rrefutil
 from allmydata.introducer.client import RemoteServiceConnector
 
index 966547fc87bdb34ea2af61b778995bdd653cf1ce..998cea5413a15386429eaec5b94affb16d735926 100644 (file)
@@ -5,7 +5,8 @@
 
 import os
 from twisted.trial import unittest
-from allmydata import uri, storage
+from allmydata import uri
+from allmydata.storage.server import storage_index_to_dir
 from allmydata.util import base32, fileutil
 from allmydata.immutable import upload
 from allmydata.test.no_network import GridTestMixin
@@ -90,7 +91,7 @@ class DownloadTest(GridTestMixin, unittest.TestCase):
             f.write('immutable_uri = "%s"\n' % ur.uri)
             f.write('immutable_shares = {\n')
             si = uri.from_string(ur.uri).get_storage_index()
-            si_dir = storage.storage_index_to_dir(si)
+            si_dir = storage_index_to_dir(si)
             for (i,ss,ssdir) in self.iterate_servers():
                 sharedir = os.path.join(ssdir, "shares", si_dir)
                 shares = {}
@@ -116,7 +117,7 @@ class DownloadTest(GridTestMixin, unittest.TestCase):
             f.write('mutable_uri = "%s"\n' % n.get_uri())
             f.write('mutable_shares = {\n')
             si = uri.from_string(n.get_uri()).get_storage_index()
-            si_dir = storage.storage_index_to_dir(si)
+            si_dir = storage_index_to_dir(si)
             for (i,ss,ssdir) in self.iterate_servers():
                 sharedir = os.path.join(ssdir, "shares", si_dir)
                 shares = {}
@@ -146,7 +147,7 @@ class DownloadTest(GridTestMixin, unittest.TestCase):
         # this uses the data generated by create_shares() to populate the
         # storage servers with pre-generated shares
         si = uri.from_string(immutable_uri).get_storage_index()
-        si_dir = storage.storage_index_to_dir(si)
+        si_dir = storage_index_to_dir(si)
         for i in immutable_shares:
             shares = immutable_shares[i]
             for shnum in shares:
@@ -158,7 +159,7 @@ class DownloadTest(GridTestMixin, unittest.TestCase):
                 f.close()
 
         si = uri.from_string(mutable_uri).get_storage_index()
-        si_dir = storage.storage_index_to_dir(si)
+        si_dir = storage_index_to_dir(si)
         for i in mutable_shares:
             shares = mutable_shares[i]
             for shnum in shares:
index 1dff58275768d3bc7a5f6134f83b9bb264cd8bf7..33cfe35191e617c5055159d401d6a5cc7bb49cdd 100644 (file)
@@ -5,7 +5,7 @@ from twisted.application import service
 from foolscap import Tub, eventual
 from foolscap.logging import log
 
-from allmydata import storage
+from allmydata.storage.server import si_b2a
 from allmydata.immutable import offloaded, upload
 from allmydata import uri
 from allmydata.util import hashutil, fileutil, mathutil
@@ -163,7 +163,7 @@ class AssistedUpload(unittest.TestCase):
         assert len(key) == 16
         encryptor = AES(key)
         SI = hashutil.storage_index_hash(key)
-        SI_s = storage.si_b2a(SI)
+        SI_s = si_b2a(SI)
         encfile = os.path.join(self.basedir, "CHK_encoding", SI_s)
         f = open(encfile, "wb")
         f.write(encryptor.process(DATA))
index 4d62f165e0c5bee05085498378d089fb2ec9aa35..7da3c018fef5d92e9aaef2f71afdc2111e658059 100644 (file)
@@ -4,7 +4,8 @@ from cStringIO import StringIO
 from twisted.trial import unittest
 from twisted.internet import defer, reactor
 from twisted.python import failure
-from allmydata import uri, storage
+from allmydata import uri
+from allmydata.storage.server import StorageServer
 from allmydata.immutable import download
 from allmydata.util import base32, idlib
 from allmydata.util.idlib import shortnodeid_b2a
@@ -1803,7 +1804,7 @@ class LessFakeClient(FakeClient):
         for peerid in self._peerids:
             peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid))
             make_dirs(peerdir)
-            ss = storage.StorageServer(peerdir)
+            ss = StorageServer(peerdir)
             ss.setNodeID(peerid)
             lw = LocalWrapper(ss)
             self._connections[peerid] = lw
index 4c0544433680b985d50d49b4e32f5fff9e61b052..e8721b038d77c408df310340e826806b4614db07 100644 (file)
@@ -5,9 +5,11 @@ import time, os.path, stat
 import itertools
 from allmydata import interfaces
 from allmydata.util import fileutil, hashutil, base32
-from allmydata.storage import BucketWriter, BucketReader, \
-     StorageServer, MutableShareFile, \
-     storage_index_to_dir, DataTooLargeError, LeaseInfo
+from allmydata.storage.server import StorageServer, storage_index_to_dir
+from allmydata.storage.mutable import MutableShareFile
+from allmydata.storage.immutable import BucketWriter, BucketReader
+from allmydata.storage.common import DataTooLargeError
+from allmydata.storage.lease import LeaseInfo
 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
      ReadBucketProxy
 from allmydata.interfaces import BadWriteEnablerError
index fbd281e15a3a82c67ccf4df8a1946625bc2480bd..07a4ec55eabe0144d202eab11b0da96a3dc14323 100644 (file)
@@ -8,7 +8,9 @@ from twisted.internet import threads # CLI tests use deferToThread
 from twisted.internet.error import ConnectionDone, ConnectionLost
 from twisted.internet.interfaces import IConsumer, IPushProducer
 import allmydata
-from allmydata import uri, storage
+from allmydata import uri
+from allmydata.storage.mutable import MutableShareFile
+from allmydata.storage.server import si_a2b
 from allmydata.immutable import download, filenode, offloaded, upload
 from allmydata.util import idlib, mathutil
 from allmydata.util import log, base32
@@ -442,7 +444,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
                 assert pieces[-5].startswith("client")
                 client_num = int(pieces[-5][-1])
                 storage_index_s = pieces[-1]
-                storage_index = storage.si_a2b(storage_index_s)
+                storage_index = si_a2b(storage_index_s)
                 for sharename in filenames:
                     shnum = int(sharename)
                     filename = os.path.join(dirpath, sharename)
@@ -453,7 +455,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
         return shares
 
     def _corrupt_mutable_share(self, filename, which):
-        msf = storage.MutableShareFile(filename)
+        msf = MutableShareFile(filename)
         datav = msf.readv([ (0, 1000000) ])
         final_share = datav[0]
         assert len(final_share) < 1000000 # ought to be truncated
index cde8cba3d726323e00ae75ef0540dd5421417eec..8ea358de772fe5a42a3f58c3028819186e5a9344 100644 (file)
@@ -6,7 +6,9 @@ from twisted.trial import unittest
 from twisted.internet import defer, reactor
 from twisted.web import client, error, http
 from twisted.python import failure, log
-from allmydata import interfaces, uri, webish, storage
+from allmydata import interfaces, uri, webish
+from allmydata.storage.mutable import MutableShareFile
+from allmydata.storage.immutable import ShareFile
 from allmydata.immutable import upload, download
 from allmydata.web import status, common
 from allmydata.scripts.debug import CorruptShareOptions, corrupt_share
@@ -2947,10 +2949,10 @@ class Grid(GridTestMixin, WebErrorMixin, unittest.TestCase):
         lease_counts = []
         for shnum, serverid, fn in shares:
             if u.startswith("URI:SSK") or u.startswith("URI:DIR2"):
-                sf = storage.MutableShareFile(fn)
+                sf = MutableShareFile(fn)
                 num_leases = len(sf.debug_get_leases())
             elif u.startswith("URI:CHK"):
-                sf = storage.ShareFile(fn)
+                sf = ShareFile(fn)
                 num_leases = len(list(sf.iter_leases()))
             else:
                 raise RuntimeError("can't get leases on %s" % u)
index 776a7b74e0b4398affb052bc72f85c2528891456..297fe93c8ad033f4aba95324e187bed4acfdf98b 100644 (file)
@@ -2,7 +2,7 @@
 import re, urllib
 from zope.interface import implements
 from twisted.python.components import registerAdapter
-from allmydata import storage
+from allmydata.storage.server import si_a2b, si_b2a
 from allmydata.util import base32, hashutil
 from allmydata.interfaces import IURI, IDirnodeURI, IFileURI, IImmutableFileURI, \
     IVerifierURI, IMutableFileURI, INewDirectoryURI, IReadonlyNewDirectoryURI
@@ -136,7 +136,7 @@ class CHKFileVerifierURI(_BaseURI):
     def init_from_string(cls, uri):
         mo = cls.STRING_RE.search(uri)
         assert mo, (uri, cls, cls.STRING_RE)
-        return cls(storage.si_a2b(mo.group(1)), base32.a2b(mo.group(2)),
+        return cls(si_a2b(mo.group(1)), base32.a2b(mo.group(2)),
                    int(mo.group(3)), int(mo.group(4)), int(mo.group(5)))
 
     def to_string(self):
@@ -145,7 +145,7 @@ class CHKFileVerifierURI(_BaseURI):
         assert isinstance(self.size, (int,long))
 
         return ('URI:CHK-Verifier:%s:%s:%d:%d:%d' %
-                (storage.si_b2a(self.storage_index),
+                (si_b2a(self.storage_index),
                  base32.b2a(self.uri_extension_hash),
                  self.needed_shares,
                  self.total_shares,
@@ -308,18 +308,18 @@ class SSKVerifierURI(_BaseURI):
     def init_from_human_encoding(cls, uri):
         mo = cls.HUMAN_RE.search(uri)
         assert mo, uri
-        return cls(storage.si_a2b(mo.group(1)), base32.a2b(mo.group(2)))
+        return cls(si_a2b(mo.group(1)), base32.a2b(mo.group(2)))
 
     @classmethod
     def init_from_string(cls, uri):
         mo = cls.STRING_RE.search(uri)
         assert mo, (uri, cls)
-        return cls(storage.si_a2b(mo.group(1)), base32.a2b(mo.group(2)))
+        return cls(si_a2b(mo.group(1)), base32.a2b(mo.group(2)))
 
     def to_string(self):
         assert isinstance(self.storage_index, str)
         assert isinstance(self.fingerprint, str)
-        return 'URI:SSK-Verifier:%s:%s' % (storage.si_b2a(self.storage_index),
+        return 'URI:SSK-Verifier:%s:%s' % (si_b2a(self.storage_index),
                                            base32.b2a(self.fingerprint))
 
 class _NewDirectoryBaseURI(_BaseURI):