]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
storage: split WriteBucketProxy and ReadBucketProxy out into immutable/layout.py...
authorBrian Warner <warner@allmydata.com>
Fri, 10 Oct 2008 00:08:00 +0000 (17:08 -0700)
committerBrian Warner <warner@allmydata.com>
Fri, 10 Oct 2008 00:08:00 +0000 (17:08 -0700)
src/allmydata/immutable/download.py
src/allmydata/immutable/layout.py [new file with mode: 0644]
src/allmydata/immutable/upload.py
src/allmydata/offloaded.py
src/allmydata/scripts/debug.py
src/allmydata/storage.py
src/allmydata/test/test_storage.py

index 80b9ce65c0265ef20bb18b45dfd475663c7c573c..a2216bee9576ad456bfa02b1e60bc98513e15d36 100644 (file)
@@ -12,6 +12,7 @@ from allmydata.util.assertutil import _assert
 from allmydata import codec, hashtree, storage, uri
 from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \
      IDownloadStatus, IDownloadResults
+from allmydata.immutable import layout
 from allmydata.immutable.encode import NotEnoughSharesError
 from pycryptopp.cipher.aes import AES
 
@@ -580,7 +581,7 @@ class FileDownloader:
                                     (self._responses_received,
                                      self._queries_sent))
         for sharenum, bucket in buckets.iteritems():
-            b = storage.ReadBucketProxy(bucket, peerid, self._si_s)
+            b = layout.ReadBucketProxy(bucket, peerid, self._si_s)
             self.add_share_bucket(sharenum, b)
             self._uri_extension_sources.append(b)
             if self._results:
diff --git a/src/allmydata/immutable/layout.py b/src/allmydata/immutable/layout.py
new file mode 100644 (file)
index 0000000..9f7be65
--- /dev/null
@@ -0,0 +1,301 @@
+
+import struct
+from zope.interface import implements
+from twisted.internet import defer
+from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
+     FileTooLargeError, HASH_SIZE
+from allmydata.util import mathutil, idlib
+from allmydata.util.assertutil import _assert, precondition
+
+
+"""
+Share data is written into a single file. At the start of the file, there is
+a series of four-byte big-endian offset values, which indicate where each
+section starts. Each offset is measured from the beginning of the file.
+
+0x00: version number (=00 00 00 01)
+0x04: segment size
+0x08: data size
+0x0c: offset of data (=00 00 00 24)
+0x10: offset of plaintext_hash_tree
+0x14: offset of crypttext_hash_tree
+0x18: offset of block_hashes
+0x1c: offset of share_hashes
+0x20: offset of uri_extension_length + uri_extension
+0x24: start of data
+?   : start of plaintext_hash_tree
+?   : start of crypttext_hash_tree
+?   : start of block_hashes
+?   : start of share_hashes
+       each share_hash is written as a two-byte (big-endian) hashnum
+       followed by the 32-byte SHA-256 hash. We only store the hashes
+       necessary to validate the share hash root
+?   : start of uri_extension_length (four-byte big-endian value)
+?   : start of uri_extension
+"""
+
+def allocated_size(data_size, num_segments, num_share_hashes,
+                   uri_extension_size):
+    wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
+                           uri_extension_size, None)
+    uri_extension_starts_at = wbp._offsets['uri_extension']
+    return uri_extension_starts_at + 4 + uri_extension_size
+
+class WriteBucketProxy:
+    implements(IStorageBucketWriter)
+    def __init__(self, rref, data_size, segment_size, num_segments,
+                 num_share_hashes, uri_extension_size, nodeid):
+        self._rref = rref
+        self._data_size = data_size
+        self._segment_size = segment_size
+        self._num_segments = num_segments
+        self._nodeid = nodeid
+
+        if segment_size >= 2**32 or data_size >= 2**32:
+            raise FileTooLargeError("This file is too large to be uploaded (data_size).")
+
+        effective_segments = mathutil.next_power_of_k(num_segments,2)
+        self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
+        # how many share hashes are included in each share? This will be
+        # about ln2(num_shares).
+        self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
+        # we commit to not sending a uri extension larger than this
+        self._uri_extension_size = uri_extension_size
+
+        offsets = self._offsets = {}
+        x = 0x24
+        offsets['data'] = x
+        x += data_size
+        offsets['plaintext_hash_tree'] = x
+        x += self._segment_hash_size
+        offsets['crypttext_hash_tree'] = x
+        x += self._segment_hash_size
+        offsets['block_hashes'] = x
+        x += self._segment_hash_size
+        offsets['share_hashes'] = x
+        x += self._share_hash_size
+        offsets['uri_extension'] = x
+
+        if x >= 2**32:
+            raise FileTooLargeError("This file is too large to be uploaded (offsets).")
+
+        offset_data = struct.pack(">LLLLLLLLL",
+                                  1, # version number
+                                  segment_size,
+                                  data_size,
+                                  offsets['data'],
+                                  offsets['plaintext_hash_tree'],
+                                  offsets['crypttext_hash_tree'],
+                                  offsets['block_hashes'],
+                                  offsets['share_hashes'],
+                                  offsets['uri_extension'],
+                                  )
+        assert len(offset_data) == 0x24
+        self._offset_data = offset_data
+
+    def __repr__(self):
+        if self._nodeid:
+            nodeid_s = idlib.nodeid_b2a(self._nodeid)
+        else:
+            nodeid_s = "[None]"
+        return "<allmydata.storage.WriteBucketProxy for node %s>" % nodeid_s
+
+    def start(self):
+        return self._write(0, self._offset_data)
+
+    def put_block(self, segmentnum, data):
+        offset = self._offsets['data'] + segmentnum * self._segment_size
+        assert offset + len(data) <= self._offsets['uri_extension']
+        assert isinstance(data, str)
+        if segmentnum < self._num_segments-1:
+            precondition(len(data) == self._segment_size,
+                         len(data), self._segment_size)
+        else:
+            precondition(len(data) == (self._data_size -
+                                       (self._segment_size *
+                                        (self._num_segments - 1))),
+                         len(data), self._segment_size)
+        return self._write(offset, data)
+
+    def put_plaintext_hashes(self, hashes):
+        offset = self._offsets['plaintext_hash_tree']
+        assert isinstance(hashes, list)
+        data = "".join(hashes)
+        precondition(len(data) == self._segment_hash_size,
+                     len(data), self._segment_hash_size)
+        precondition(offset+len(data) <= self._offsets['crypttext_hash_tree'],
+                     offset, len(data), offset+len(data),
+                     self._offsets['crypttext_hash_tree'])
+        return self._write(offset, data)
+
+    def put_crypttext_hashes(self, hashes):
+        offset = self._offsets['crypttext_hash_tree']
+        assert isinstance(hashes, list)
+        data = "".join(hashes)
+        precondition(len(data) == self._segment_hash_size,
+                     len(data), self._segment_hash_size)
+        precondition(offset + len(data) <= self._offsets['block_hashes'],
+                     offset, len(data), offset+len(data),
+                     self._offsets['block_hashes'])
+        return self._write(offset, data)
+
+    def put_block_hashes(self, blockhashes):
+        offset = self._offsets['block_hashes']
+        assert isinstance(blockhashes, list)
+        data = "".join(blockhashes)
+        precondition(len(data) == self._segment_hash_size,
+                     len(data), self._segment_hash_size)
+        precondition(offset + len(data) <= self._offsets['share_hashes'],
+                     offset, len(data), offset+len(data),
+                     self._offsets['share_hashes'])
+        return self._write(offset, data)
+
+    def put_share_hashes(self, sharehashes):
+        # sharehashes is a list of (index, hash) tuples, so they get stored
+        # as 2+32=34 bytes each
+        offset = self._offsets['share_hashes']
+        assert isinstance(sharehashes, list)
+        data = "".join([struct.pack(">H", hashnum) + hashvalue
+                        for hashnum,hashvalue in sharehashes])
+        precondition(len(data) == self._share_hash_size,
+                     len(data), self._share_hash_size)
+        precondition(offset + len(data) <= self._offsets['uri_extension'],
+                     offset, len(data), offset+len(data),
+                     self._offsets['uri_extension'])
+        return self._write(offset, data)
+
+    def put_uri_extension(self, data):
+        offset = self._offsets['uri_extension']
+        assert isinstance(data, str)
+        precondition(len(data) <= self._uri_extension_size,
+                     len(data), self._uri_extension_size)
+        length = struct.pack(">L", len(data))
+        return self._write(offset, length+data)
+
+    def _write(self, offset, data):
+        # TODO: for small shares, buffer the writes and do just a single call
+        return self._rref.callRemote("write", offset, data)
+
+    def close(self):
+        return self._rref.callRemote("close")
+
+    def abort(self):
+        return self._rref.callRemoteOnly("abort")
+
+class ReadBucketProxy:
+    implements(IStorageBucketReader)
+    def __init__(self, rref, peerid=None, storage_index_s=None):
+        self._rref = rref
+        self._peerid = peerid
+        self._si_s = storage_index_s
+        self._started = False
+
+    def get_peerid(self):
+        return self._peerid
+
+    def __repr__(self):
+        peerid_s = idlib.shortnodeid_b2a(self._peerid)
+        return "<ReadBucketProxy to peer [%s] SI %s>" % (peerid_s,
+                                                         self._si_s)
+
+    def startIfNecessary(self):
+        if self._started:
+            return defer.succeed(self)
+        d = self.start()
+        d.addCallback(lambda res: self)
+        return d
+
+    def start(self):
+        # TODO: for small shares, read the whole bucket in start()
+        d = self._read(0, 0x24)
+        d.addCallback(self._parse_offsets)
+        def _started(res):
+            self._started = True
+            return res
+        d.addCallback(_started)
+        return d
+
+    def _parse_offsets(self, data):
+        precondition(len(data) == 0x24)
+        self._offsets = {}
+        (version, self._segment_size, self._data_size) = \
+                  struct.unpack(">LLL", data[0:0xc])
+        _assert(version == 1)
+        x = 0x0c
+        for field in ( 'data',
+                       'plaintext_hash_tree',
+                       'crypttext_hash_tree',
+                       'block_hashes',
+                       'share_hashes',
+                       'uri_extension',
+                       ):
+            offset = struct.unpack(">L", data[x:x+4])[0]
+            x += 4
+            self._offsets[field] = offset
+        return self._offsets
+
+    def get_block(self, blocknum):
+        num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
+        if blocknum < num_segments-1:
+            size = self._segment_size
+        else:
+            size = self._data_size % self._segment_size
+            if size == 0:
+                size = self._segment_size
+        offset = self._offsets['data'] + blocknum * self._segment_size
+        return self._read(offset, size)
+
+    def _str2l(self, s):
+        """ split string (pulled from storage) into a list of blockids """
+        return [ s[i:i+HASH_SIZE]
+                 for i in range(0, len(s), HASH_SIZE) ]
+
+    def get_plaintext_hashes(self):
+        offset = self._offsets['plaintext_hash_tree']
+        size = self._offsets['crypttext_hash_tree'] - offset
+        d = self._read(offset, size)
+        d.addCallback(self._str2l)
+        return d
+
+    def get_crypttext_hashes(self):
+        offset = self._offsets['crypttext_hash_tree']
+        size = self._offsets['block_hashes'] - offset
+        d = self._read(offset, size)
+        d.addCallback(self._str2l)
+        return d
+
+    def get_block_hashes(self):
+        offset = self._offsets['block_hashes']
+        size = self._offsets['share_hashes'] - offset
+        d = self._read(offset, size)
+        d.addCallback(self._str2l)
+        return d
+
+    def get_share_hashes(self):
+        offset = self._offsets['share_hashes']
+        size = self._offsets['uri_extension'] - offset
+        assert size % (2+HASH_SIZE) == 0
+        d = self._read(offset, size)
+        def _unpack_share_hashes(data):
+            assert len(data) == size
+            hashes = []
+            for i in range(0, size, 2+HASH_SIZE):
+                hashnum = struct.unpack(">H", data[i:i+2])[0]
+                hashvalue = data[i+2:i+2+HASH_SIZE]
+                hashes.append( (hashnum, hashvalue) )
+            return hashes
+        d.addCallback(_unpack_share_hashes)
+        return d
+
+    def get_uri_extension(self):
+        offset = self._offsets['uri_extension']
+        d = self._read(offset, 4)
+        def _got_length(data):
+            length = struct.unpack(">L", data)[0]
+            return self._read(offset+4, length)
+        d.addCallback(_got_length)
+        return d
+
+    def _read(self, offset, length):
+        return self._rref.callRemote("read", offset, length)
+
index cf275b899b1c6f4a78f48f01d79cfb76f7c1ce86..40fed09b7f8225b761f91c4e62bee70977dd2a8b 100644 (file)
@@ -18,6 +18,7 @@ from allmydata.util import base32, idlib, mathutil
 from allmydata.util.assertutil import precondition
 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus
+from allmydata.immutable import layout
 from pycryptopp.cipher.aes import AES
 
 from cStringIO import StringIO
@@ -76,10 +77,10 @@ class PeerTracker:
         self._storageserver = storage_server # to an RIStorageServer
         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
         self.sharesize = sharesize
-        self.allocated_size = storage.allocated_size(sharesize,
-                                                     num_segments,
-                                                     num_share_hashes,
-                                                     EXTENSION_SIZE)
+        self.allocated_size = layout.allocated_size(sharesize,
+                                                    num_segments,
+                                                    num_share_hashes,
+                                                    EXTENSION_SIZE)
 
         self.blocksize = blocksize
         self.num_segments = num_segments
@@ -109,12 +110,12 @@ class PeerTracker:
         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
         b = {}
         for sharenum, rref in buckets.iteritems():
-            bp = storage.WriteBucketProxy(rref, self.sharesize,
-                                          self.blocksize,
-                                          self.num_segments,
-                                          self.num_share_hashes,
-                                          EXTENSION_SIZE,
-                                          self.peerid)
+            bp = layout.WriteBucketProxy(rref, self.sharesize,
+                                         self.blocksize,
+                                         self.num_segments,
+                                         self.num_share_hashes,
+                                         EXTENSION_SIZE,
+                                         self.peerid)
             b[sharenum] = bp
         self.buckets.update(b)
         return (alreadygot, set(b.keys()))
index f2a8afadb01135a5ae60e3f8278b59b3741b3eaa..52a9f4fec5cebca3ce8266c526502a2162b929ea 100644 (file)
@@ -7,6 +7,7 @@ from foolscap import Referenceable, DeadReferenceError
 from foolscap.eventual import eventually
 from allmydata import interfaces, storage, uri
 from allmydata.immutable import upload
+from allmydata.immutable.layout import ReadBucketProxy
 from allmydata.util import idlib, log, observer, fileutil, hashutil
 
 
@@ -85,8 +86,7 @@ class CHKCheckerAndUEBFetcher:
             self.log("no readers, so no UEB", level=log.NOISY)
             return
         b,peerid = self._readers.pop()
-        rbp = storage.ReadBucketProxy(b, peerid,
-                                      storage.si_b2a(self._storage_index))
+        rbp = ReadBucketProxy(b, peerid, storage.si_b2a(self._storage_index))
         d = rbp.startIfNecessary()
         d.addCallback(lambda res: rbp.get_uri_extension())
         d.addCallback(self._got_uri_extension)
index 243e49d8e47094a596c3a6c71a871ad4a2a1e704..863e1e656673681a562db8ff48002e44b626e263 100644 (file)
@@ -47,11 +47,12 @@ def dump_share(options):
 def dump_immutable_share(options):
     from allmydata import uri, storage
     from allmydata.util import base32
+    from allmydata.immutable.layout import ReadBucketProxy
 
     out = options.stdout
     f = storage.ShareFile(options['filename'])
     # use a ReadBucketProxy to parse the bucket and find the uri extension
-    bp = storage.ReadBucketProxy(None)
+    bp = ReadBucketProxy(None)
     offsets = bp._parse_offsets(f.read_share_data(0, 0x24))
     seek = offsets['uri_extension']
     length = struct.unpack(">L", f.read_share_data(seek, 4))[0]
@@ -516,6 +517,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
     from allmydata import uri, storage
     from allmydata.mutable.layout import unpack_share
     from allmydata.mutable.common import NeedMoreDataError
+    from allmydata.immutable.layout import ReadBucketProxy
     from allmydata.util import base32
     import struct
 
@@ -569,7 +571,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
 
         sf = storage.ShareFile(abs_sharefile)
         # use a ReadBucketProxy to parse the bucket and find the uri extension
-        bp = storage.ReadBucketProxy(None)
+        bp = ReadBucketProxy(None)
         offsets = bp._parse_offsets(sf.read_share_data(0, 0x24))
         seek = offsets['uri_extension']
         length = struct.unpack(">L", sf.read_share_data(seek, 4))[0]
@@ -689,7 +691,7 @@ def corrupt_share(options):
     else:
         # otherwise assume it's immutable
         f = storage.ShareFile(fn)
-        bp = storage.ReadBucketProxy(None)
+        bp = ReadBucketProxy(None)
         offsets = bp._parse_offsets(f.read_share_data(0, 0x24))
         start = f._data_offset + offsets["data"]
         end = f._data_offset + offsets["plaintext_hash_tree"]
index b2e9685b55be30e65465044ac27d063ebcda5e72..29b4019dd6b258c1b705fa99b1290a84ab672d7c 100644 (file)
@@ -3,14 +3,12 @@ from distutils.version import LooseVersion
 
 from foolscap import Referenceable
 from twisted.application import service
-from twisted.internet import defer
 
 from zope.interface import implements
 from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
-     RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \
-     BadWriteEnablerError, IStatsProducer, FileTooLargeError
-from allmydata.util import base32, fileutil, idlib, mathutil, log
-from allmydata.util.assertutil import precondition, _assert
+     RIBucketReader, BadWriteEnablerError, IStatsProducer
+from allmydata.util import base32, fileutil, idlib, log
+from allmydata.util.assertutil import precondition
 import allmydata # for __version__
 
 class DataTooLargeError(Exception):
@@ -38,7 +36,7 @@ NUM_RE=re.compile("^[0-9]+$")
 #  0x00: share file version number, four bytes, current version is 1
 #  0x04: share data length, four bytes big-endian = A
 #  0x08: number of leases, four bytes big-endian
-#  0x0c: beginning of share data (described below, at WriteBucketProxy)
+#  0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
 #  A+0x0c = B: first lease. Lease format is:
 #   B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
 #   B+0x04: renew secret, 32 bytes (SHA256)
@@ -1210,295 +1208,3 @@ class StorageServer(service.MultiService, Referenceable):
 
 
 # the code before here runs on the storage server, not the client
-# the code beyond here runs on the client, not the storage server
-
-"""
-Share data is written into a single file. At the start of the file, there is
-a series of four-byte big-endian offset values, which indicate where each
-section starts. Each offset is measured from the beginning of the file.
-
-0x00: version number (=00 00 00 01)
-0x04: segment size
-0x08: data size
-0x0c: offset of data (=00 00 00 24)
-0x10: offset of plaintext_hash_tree
-0x14: offset of crypttext_hash_tree
-0x18: offset of block_hashes
-0x1c: offset of share_hashes
-0x20: offset of uri_extension_length + uri_extension
-0x24: start of data
-?   : start of plaintext_hash_tree
-?   : start of crypttext_hash_tree
-?   : start of block_hashes
-?   : start of share_hashes
-       each share_hash is written as a two-byte (big-endian) hashnum
-       followed by the 32-byte SHA-256 hash. We only store the hashes
-       necessary to validate the share hash root
-?   : start of uri_extension_length (four-byte big-endian value)
-?   : start of uri_extension
-"""
-
-def allocated_size(data_size, num_segments, num_share_hashes,
-                   uri_extension_size):
-    wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
-                           uri_extension_size, None)
-    uri_extension_starts_at = wbp._offsets['uri_extension']
-    return uri_extension_starts_at + 4 + uri_extension_size
-
-class WriteBucketProxy:
-    implements(IStorageBucketWriter)
-    def __init__(self, rref, data_size, segment_size, num_segments,
-                 num_share_hashes, uri_extension_size, nodeid):
-        self._rref = rref
-        self._data_size = data_size
-        self._segment_size = segment_size
-        self._num_segments = num_segments
-        self._nodeid = nodeid
-
-        if segment_size >= 2**32 or data_size >= 2**32:
-            raise FileTooLargeError("This file is too large to be uploaded (data_size).")
-
-        effective_segments = mathutil.next_power_of_k(num_segments,2)
-        self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
-        # how many share hashes are included in each share? This will be
-        # about ln2(num_shares).
-        self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
-        # we commit to not sending a uri extension larger than this
-        self._uri_extension_size = uri_extension_size
-
-        offsets = self._offsets = {}
-        x = 0x24
-        offsets['data'] = x
-        x += data_size
-        offsets['plaintext_hash_tree'] = x
-        x += self._segment_hash_size
-        offsets['crypttext_hash_tree'] = x
-        x += self._segment_hash_size
-        offsets['block_hashes'] = x
-        x += self._segment_hash_size
-        offsets['share_hashes'] = x
-        x += self._share_hash_size
-        offsets['uri_extension'] = x
-
-        if x >= 2**32:
-            raise FileTooLargeError("This file is too large to be uploaded (offsets).")
-
-        offset_data = struct.pack(">LLLLLLLLL",
-                                  1, # version number
-                                  segment_size,
-                                  data_size,
-                                  offsets['data'],
-                                  offsets['plaintext_hash_tree'],
-                                  offsets['crypttext_hash_tree'],
-                                  offsets['block_hashes'],
-                                  offsets['share_hashes'],
-                                  offsets['uri_extension'],
-                                  )
-        assert len(offset_data) == 0x24
-        self._offset_data = offset_data
-
-    def __repr__(self):
-        if self._nodeid:
-            nodeid_s = idlib.nodeid_b2a(self._nodeid)
-        else:
-            nodeid_s = "[None]"
-        return "<allmydata.storage.WriteBucketProxy for node %s>" % nodeid_s
-
-    def start(self):
-        return self._write(0, self._offset_data)
-
-    def put_block(self, segmentnum, data):
-        offset = self._offsets['data'] + segmentnum * self._segment_size
-        assert offset + len(data) <= self._offsets['uri_extension']
-        assert isinstance(data, str)
-        if segmentnum < self._num_segments-1:
-            precondition(len(data) == self._segment_size,
-                         len(data), self._segment_size)
-        else:
-            precondition(len(data) == (self._data_size -
-                                       (self._segment_size *
-                                        (self._num_segments - 1))),
-                         len(data), self._segment_size)
-        return self._write(offset, data)
-
-    def put_plaintext_hashes(self, hashes):
-        offset = self._offsets['plaintext_hash_tree']
-        assert isinstance(hashes, list)
-        data = "".join(hashes)
-        precondition(len(data) == self._segment_hash_size,
-                     len(data), self._segment_hash_size)
-        precondition(offset+len(data) <= self._offsets['crypttext_hash_tree'],
-                     offset, len(data), offset+len(data),
-                     self._offsets['crypttext_hash_tree'])
-        return self._write(offset, data)
-
-    def put_crypttext_hashes(self, hashes):
-        offset = self._offsets['crypttext_hash_tree']
-        assert isinstance(hashes, list)
-        data = "".join(hashes)
-        precondition(len(data) == self._segment_hash_size,
-                     len(data), self._segment_hash_size)
-        precondition(offset + len(data) <= self._offsets['block_hashes'],
-                     offset, len(data), offset+len(data),
-                     self._offsets['block_hashes'])
-        return self._write(offset, data)
-
-    def put_block_hashes(self, blockhashes):
-        offset = self._offsets['block_hashes']
-        assert isinstance(blockhashes, list)
-        data = "".join(blockhashes)
-        precondition(len(data) == self._segment_hash_size,
-                     len(data), self._segment_hash_size)
-        precondition(offset + len(data) <= self._offsets['share_hashes'],
-                     offset, len(data), offset+len(data),
-                     self._offsets['share_hashes'])
-        return self._write(offset, data)
-
-    def put_share_hashes(self, sharehashes):
-        # sharehashes is a list of (index, hash) tuples, so they get stored
-        # as 2+32=34 bytes each
-        offset = self._offsets['share_hashes']
-        assert isinstance(sharehashes, list)
-        data = "".join([struct.pack(">H", hashnum) + hashvalue
-                        for hashnum,hashvalue in sharehashes])
-        precondition(len(data) == self._share_hash_size,
-                     len(data), self._share_hash_size)
-        precondition(offset + len(data) <= self._offsets['uri_extension'],
-                     offset, len(data), offset+len(data),
-                     self._offsets['uri_extension'])
-        return self._write(offset, data)
-
-    def put_uri_extension(self, data):
-        offset = self._offsets['uri_extension']
-        assert isinstance(data, str)
-        precondition(len(data) <= self._uri_extension_size,
-                     len(data), self._uri_extension_size)
-        length = struct.pack(">L", len(data))
-        return self._write(offset, length+data)
-
-    def _write(self, offset, data):
-        # TODO: for small shares, buffer the writes and do just a single call
-        return self._rref.callRemote("write", offset, data)
-
-    def close(self):
-        return self._rref.callRemote("close")
-
-    def abort(self):
-        return self._rref.callRemoteOnly("abort")
-
-class ReadBucketProxy:
-    implements(IStorageBucketReader)
-    def __init__(self, rref, peerid=None, storage_index_s=None):
-        self._rref = rref
-        self._peerid = peerid
-        self._si_s = storage_index_s
-        self._started = False
-
-    def get_peerid(self):
-        return self._peerid
-
-    def __repr__(self):
-        peerid_s = idlib.shortnodeid_b2a(self._peerid)
-        return "<ReadBucketProxy to peer [%s] SI %s>" % (peerid_s,
-                                                         self._si_s)
-
-    def startIfNecessary(self):
-        if self._started:
-            return defer.succeed(self)
-        d = self.start()
-        d.addCallback(lambda res: self)
-        return d
-
-    def start(self):
-        # TODO: for small shares, read the whole bucket in start()
-        d = self._read(0, 0x24)
-        d.addCallback(self._parse_offsets)
-        def _started(res):
-            self._started = True
-            return res
-        d.addCallback(_started)
-        return d
-
-    def _parse_offsets(self, data):
-        precondition(len(data) == 0x24)
-        self._offsets = {}
-        (version, self._segment_size, self._data_size) = \
-                  struct.unpack(">LLL", data[0:0xc])
-        _assert(version == 1)
-        x = 0x0c
-        for field in ( 'data',
-                       'plaintext_hash_tree',
-                       'crypttext_hash_tree',
-                       'block_hashes',
-                       'share_hashes',
-                       'uri_extension',
-                       ):
-            offset = struct.unpack(">L", data[x:x+4])[0]
-            x += 4
-            self._offsets[field] = offset
-        return self._offsets
-
-    def get_block(self, blocknum):
-        num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
-        if blocknum < num_segments-1:
-            size = self._segment_size
-        else:
-            size = self._data_size % self._segment_size
-            if size == 0:
-                size = self._segment_size
-        offset = self._offsets['data'] + blocknum * self._segment_size
-        return self._read(offset, size)
-
-    def _str2l(self, s):
-        """ split string (pulled from storage) into a list of blockids """
-        return [ s[i:i+HASH_SIZE]
-                 for i in range(0, len(s), HASH_SIZE) ]
-
-    def get_plaintext_hashes(self):
-        offset = self._offsets['plaintext_hash_tree']
-        size = self._offsets['crypttext_hash_tree'] - offset
-        d = self._read(offset, size)
-        d.addCallback(self._str2l)
-        return d
-
-    def get_crypttext_hashes(self):
-        offset = self._offsets['crypttext_hash_tree']
-        size = self._offsets['block_hashes'] - offset
-        d = self._read(offset, size)
-        d.addCallback(self._str2l)
-        return d
-
-    def get_block_hashes(self):
-        offset = self._offsets['block_hashes']
-        size = self._offsets['share_hashes'] - offset
-        d = self._read(offset, size)
-        d.addCallback(self._str2l)
-        return d
-
-    def get_share_hashes(self):
-        offset = self._offsets['share_hashes']
-        size = self._offsets['uri_extension'] - offset
-        assert size % (2+HASH_SIZE) == 0
-        d = self._read(offset, size)
-        def _unpack_share_hashes(data):
-            assert len(data) == size
-            hashes = []
-            for i in range(0, size, 2+HASH_SIZE):
-                hashnum = struct.unpack(">H", data[i:i+2])[0]
-                hashvalue = data[i+2:i+2+HASH_SIZE]
-                hashes.append( (hashnum, hashvalue) )
-            return hashes
-        d.addCallback(_unpack_share_hashes)
-        return d
-
-    def get_uri_extension(self):
-        offset = self._offsets['uri_extension']
-        d = self._read(offset, 4)
-        def _got_length(data):
-            length = struct.unpack(">L", data)[0]
-            return self._read(offset+4, length)
-        d.addCallback(_got_length)
-        return d
-
-    def _read(self, offset, length):
-        return self._rref.callRemote("read", offset, length)
index 71626252169d935b04e6395990b6e3065feff903..70db7ab738d5acd30b3ef86d013ae43b8f4fb43a 100644 (file)
@@ -7,8 +7,9 @@ import itertools
 from allmydata import interfaces
 from allmydata.util import fileutil, hashutil
 from allmydata.storage import BucketWriter, BucketReader, \
-     WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
+     StorageServer, MutableShareFile, \
      storage_index_to_dir, DataTooLargeError, LeaseInfo
+from allmydata.immutable.layout import WriteBucketProxy, ReadBucketProxy
 from allmydata.interfaces import BadWriteEnablerError
 from allmydata.test.common import LoggingServiceParent