]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
rename storageserver.py to just storage.py, since it has both server and client sides now
authorBrian Warner <warner@lothar.com>
Sat, 14 Jul 2007 00:25:45 +0000 (17:25 -0700)
committerBrian Warner <warner@lothar.com>
Sat, 14 Jul 2007 00:25:45 +0000 (17:25 -0700)
src/allmydata/client.py
src/allmydata/download.py
src/allmydata/scripts/debug.py
src/allmydata/storage.py [new file with mode: 0644]
src/allmydata/storageserver.py [deleted file]
src/allmydata/test/test_storage.py
src/allmydata/upload.py

index 21c66f3aef0fe47fa2a1576f7a19897919f753f1..d0099e3e2679b934c467ad7b1ea12d629bad7127 100644 (file)
@@ -11,7 +11,7 @@ from twisted.python import log
 
 import allmydata
 from allmydata.Crypto.Util.number import bytes_to_long
-from allmydata.storageserver import StorageServer
+from allmydata.storage import StorageServer
 from allmydata.upload import Uploader
 from allmydata.download import Downloader
 from allmydata.webish import WebishServer
index 59649c9c11add140bdfa00666857b1add41a9197..f76b13c35b6b885c075aff477ebcce6302c70f08 100644 (file)
@@ -7,7 +7,7 @@ from twisted.application import service
 
 from allmydata.util import idlib, mathutil, hashutil
 from allmydata.util.assertutil import _assert
-from allmydata import codec, hashtree, storageserver, uri
+from allmydata import codec, hashtree, storage, uri
 from allmydata.Crypto.Cipher import AES
 from allmydata.interfaces import IDownloadTarget, IDownloader
 from allmydata.encode import NotEnoughPeersError
@@ -345,7 +345,7 @@ class FileDownloader:
     def _got_response(self, buckets, connection):
         _assert(isinstance(buckets, dict), buckets) # soon foolscap will check this for us with its DictOf schema constraint
         for sharenum, bucket in buckets.iteritems():
-            b = storageserver.ReadBucketProxy(bucket)
+            b = storage.ReadBucketProxy(bucket)
             self.add_share_bucket(sharenum, b)
             self._uri_extension_sources.append(b)
 
index d6a5388502053d183b449d5a439c1d2a867d6dfc..773c674808085719a7969be2e304a1fbbc73f209 100644 (file)
@@ -43,12 +43,12 @@ class DumpDirnodeOptions(BasedirMixin, usage.Options):
             raise usage.UsageError("<uri> parameter is required")
 
 def dump_uri_extension(config, out=sys.stdout, err=sys.stderr):
-    from allmydata import uri, storageserver
+    from allmydata import uri, storage
 
     filename = config['filename']
     f = open(filename,"rb")
     # use a ReadBucketProxy to parse the bucket and find the uri extension
-    bp = storageserver.ReadBucketProxy(None)
+    bp = storage.ReadBucketProxy(None)
     offsets = bp._parse_offsets(f.read(8*4))
     f.seek(offsets['uri_extension'])
     length = struct.unpack(">L", f.read(4))[0]
diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py
new file mode 100644 (file)
index 0000000..b5acdee
--- /dev/null
@@ -0,0 +1,389 @@
+import os, re, weakref, stat, struct
+
+from foolscap import Referenceable
+from twisted.application import service
+from twisted.internet import defer
+
+from zope.interface import implements
+from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
+     RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE
+from allmydata.util import fileutil, idlib, mathutil
+from allmydata.util.assertutil import precondition
+
+# store/
+# store/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/$STORAGEINDEX/$SHARENUM on success
+# store/$STORAGEINDEX
+# store/$STORAGEINDEX/$SHARENUM
+# store/$STORAGEINDEX/$SHARENUM/blocksize
+# store/$STORAGEINDEX/$SHARENUM/data
+# store/$STORAGEINDEX/$SHARENUM/blockhashes
+# store/$STORAGEINDEX/$SHARENUM/sharehashtree
+
+# $SHARENUM matches this regex:
+NUM_RE=re.compile("[0-9]*")
+
+class BucketWriter(Referenceable):
+    implements(RIBucketWriter)
+
+    def __init__(self, ss, incominghome, finalhome, size):
+        self.ss = ss
+        self.incominghome = incominghome
+        self.finalhome = finalhome
+        self._size = size
+        self.closed = False
+        # touch the file, so later callers will see that we're working on it
+        f = open(self.incominghome, 'ab')
+        f.close()
+
+    def allocated_size(self):
+        return self._size
+
+    def remote_write(self, offset, data):
+        precondition(not self.closed)
+        precondition(offset >= 0)
+        precondition(offset+len(data) <= self._size)
+        f = open(self.incominghome, 'ab')
+        f.seek(offset)
+        f.write(data)
+        f.close()
+
+    def remote_close(self):
+        precondition(not self.closed)
+        fileutil.rename(self.incominghome, self.finalhome)
+        self.closed = True
+        filelen = os.stat(self.finalhome)[stat.ST_SIZE]
+        self.ss.bucket_writer_closed(self, filelen)
+
+
+class BucketReader(Referenceable):
+    implements(RIBucketReader)
+
+    def __init__(self, home):
+        self.home = home
+
+    def remote_read(self, offset, length):
+        f = open(self.home, 'rb')
+        f.seek(offset)
+        return f.read(length)
+
+class StorageServer(service.MultiService, Referenceable):
+    implements(RIStorageServer)
+    name = 'storageserver'
+
+    def __init__(self, storedir, sizelimit=None):
+        service.MultiService.__init__(self)
+        fileutil.make_dirs(storedir)
+        self.storedir = storedir
+        self.sizelimit = sizelimit
+        self.incomingdir = os.path.join(storedir, 'incoming')
+        self._clean_incomplete()
+        fileutil.make_dirs(self.incomingdir)
+        self._active_writers = weakref.WeakKeyDictionary()
+
+        self.measure_size()
+
+    def _clean_incomplete(self):
+        fileutil.rm_dir(self.incomingdir)
+
+    def measure_size(self):
+        self.consumed = fileutil.du(self.storedir)
+
+    def allocated_size(self):
+        space = self.consumed
+        for bw in self._active_writers:
+            space += bw.allocated_size()
+        return space
+
+    def remote_allocate_buckets(self, storage_index, sharenums, allocated_size,
+                                canary):
+        alreadygot = set()
+        bucketwriters = {} # k: shnum, v: BucketWriter
+        si_s = idlib.b2a(storage_index)
+        space_per_bucket = allocated_size
+        no_limits = self.sizelimit is None
+        yes_limits = not no_limits
+        if yes_limits:
+            remaining_space = self.sizelimit - self.allocated_size()
+        for shnum in sharenums:
+            incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum)
+            finalhome = os.path.join(self.storedir, si_s, "%d" % shnum)
+            if os.path.exists(incominghome) or os.path.exists(finalhome):
+                alreadygot.add(shnum)
+            elif no_limits or remaining_space >= space_per_bucket:
+                fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
+                bw = BucketWriter(self, incominghome, finalhome,
+                                  space_per_bucket)
+                bucketwriters[shnum] = bw
+                self._active_writers[bw] = 1
+                if yes_limits:
+                    remaining_space -= space_per_bucket
+            else:
+                # not enough space to accept this bucket
+                pass
+
+        if bucketwriters:
+            fileutil.make_dirs(os.path.join(self.storedir, si_s))
+
+        return alreadygot, bucketwriters
+
+    def bucket_writer_closed(self, bw, consumed_size):
+        self.consumed += consumed_size
+        del self._active_writers[bw]
+
+    def remote_get_buckets(self, storage_index):
+        bucketreaders = {} # k: sharenum, v: BucketReader
+        storagedir = os.path.join(self.storedir, idlib.b2a(storage_index))
+        try:
+            for f in os.listdir(storagedir):
+                if NUM_RE.match(f):
+                    br = BucketReader(os.path.join(storagedir, f))
+                    bucketreaders[int(f)] = br
+        except OSError:
+            # Commonly caused by there being no buckets at all.
+            pass
+
+        return bucketreaders
+
+"""
+Share data is written into a single file. At the start of the file, there is
+a series of four-byte big-endian offset values, which indicate where each
+section starts. Each offset is measured from the beginning of the file.
+
+0x00: segment size
+0x04: data size
+0x08: offset of data (=00 00 00 1c)
+0x0c: offset of plaintext_hash_tree
+0x10: offset of crypttext_hash_tree
+0x14: offset of block_hashes
+0x18: offset of share_hashes
+0x1c: offset of uri_extension_length + uri_extension
+0x20: start of data
+?   : start of plaintext_hash_tree
+?   : start of crypttext_hash_tree
+?   : start of block_hashes
+?   : start of share_hashes
+       each share_hash is written as a two-byte (big-endian) hashnum
+       followed by the 32-byte SHA-256 hash. We only store the hashes
+       necessary to validate the share hash root
+?   : start of uri_extension_length (four-byte big-endian value)
+?   : start of uri_extension
+"""
+
+def allocated_size(data_size, num_segments, num_share_hashes,
+                   uri_extension_size):
+    wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
+                           uri_extension_size)
+    uri_extension_starts_at = wbp._offsets['uri_extension']
+    return uri_extension_starts_at + 4 + uri_extension_size
+
+class WriteBucketProxy:
+    implements(IStorageBucketWriter)
+    def __init__(self, rref, data_size, segment_size, num_segments,
+                 num_share_hashes, uri_extension_size):
+        self._rref = rref
+        self._data_size = data_size
+        self._segment_size = segment_size
+        self._num_segments = num_segments
+
+        self._segment_hash_size = (2*num_segments - 1) * HASH_SIZE
+        # how many share hashes are included in each share? This will be
+        # about ln2(num_shares).
+        self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
+        # we commit to not sending a uri extension larger than this
+        self._uri_extension_size = uri_extension_size
+
+        offsets = self._offsets = {}
+        x = 0x20
+        offsets['data'] = x
+        x += data_size
+        offsets['plaintext_hash_tree'] = x
+        x += self._segment_hash_size
+        offsets['crypttext_hash_tree'] = x
+        x += self._segment_hash_size
+        offsets['block_hashes'] = x
+        x += self._segment_hash_size
+        offsets['share_hashes'] = x
+        x += self._share_hash_size
+        offsets['uri_extension'] = x
+
+        offset_data = struct.pack(">LLLLLLLL",
+                                  segment_size,
+                                  data_size,
+                                  offsets['data'],
+                                  offsets['plaintext_hash_tree'],
+                                  offsets['crypttext_hash_tree'],
+                                  offsets['block_hashes'],
+                                  offsets['share_hashes'],
+                                  offsets['uri_extension'],
+                                  )
+        assert len(offset_data) == 8*4
+        self._offset_data = offset_data
+
+    def start(self):
+        return self._write(0, self._offset_data)
+
+    def put_block(self, segmentnum, data):
+        offset = self._offsets['data'] + segmentnum * self._segment_size
+        assert offset + len(data) <= self._offsets['uri_extension']
+        assert isinstance(data, str)
+        if segmentnum < self._num_segments-1:
+            precondition(len(data) == self._segment_size,
+                         len(data), self._segment_size)
+        else:
+            precondition(len(data) == (self._data_size -
+                                       (self._segment_size *
+                                        (self._num_segments - 1))),
+                         len(data), self._segment_size)
+        return self._write(offset, data)
+
+    def put_plaintext_hashes(self, hashes):
+        offset = self._offsets['plaintext_hash_tree']
+        assert isinstance(hashes, list)
+        data = "".join(hashes)
+        assert len(data) == self._segment_hash_size
+        assert offset + len(data) <= self._offsets['crypttext_hash_tree']
+        return self._write(offset, data)
+
+    def put_crypttext_hashes(self, hashes):
+        offset = self._offsets['crypttext_hash_tree']
+        assert isinstance(hashes, list)
+        data = "".join(hashes)
+        assert len(data) == self._segment_hash_size
+        assert offset + len(data) <= self._offsets['block_hashes']
+        return self._write(offset, data)
+
+    def put_block_hashes(self, blockhashes):
+        offset = self._offsets['block_hashes']
+        assert isinstance(blockhashes, list)
+        data = "".join(blockhashes)
+        assert len(data) == self._segment_hash_size
+        assert offset + len(data) <= self._offsets['share_hashes']
+        return self._write(offset, data)
+
+    def put_share_hashes(self, sharehashes):
+        # sharehashes is a list of (index, hash) tuples, so they get stored
+        # as 2+32=34 bytes each
+        offset = self._offsets['share_hashes']
+        assert isinstance(sharehashes, list)
+        data = "".join([struct.pack(">H", hashnum) + hashvalue
+                        for hashnum,hashvalue in sharehashes])
+        precondition(len(data) == self._share_hash_size,
+                     len(data), self._share_hash_size)
+        assert offset + len(data) <= self._offsets['uri_extension']
+        return self._write(offset, data)
+
+    def put_uri_extension(self, data):
+        offset = self._offsets['uri_extension']
+        assert isinstance(data, str)
+        assert len(data) <= self._uri_extension_size
+        length = struct.pack(">L", len(data))
+        return self._write(offset, length+data)
+
+    def _write(self, offset, data):
+        # TODO: for small shares, buffer the writes and do just a single call
+        return self._rref.callRemote("write", offset, data)
+
+    def close(self):
+        return self._rref.callRemote("close")
+
+class ReadBucketProxy:
+    implements(IStorageBucketReader)
+    def __init__(self, rref):
+        self._rref = rref
+        self._started = False
+
+    def startIfNecessary(self):
+        if self._started:
+            return defer.succeed(self)
+        d = self.start()
+        d.addCallback(lambda res: self)
+        return d
+
+    def start(self):
+        # TODO: for small shares, read the whole bucket in start()
+        d = self._read(0, 8*4)
+        d.addCallback(self._parse_offsets)
+        return d
+
+    def _parse_offsets(self, data):
+        precondition(len(data) == 8*4)
+        self._offsets = {}
+        self._segment_size = struct.unpack(">L", data[0:4])[0]
+        self._data_size = struct.unpack(">L", data[4:8])[0]
+        x = 0x08
+        for field in ( 'data',
+                       'plaintext_hash_tree',
+                       'crypttext_hash_tree',
+                       'block_hashes',
+                       'share_hashes',
+                       'uri_extension',
+                       ):
+            offset = struct.unpack(">L", data[x:x+4])[0]
+            x += 4
+            self._offsets[field] = offset
+        return self._offsets
+
+    def get_block(self, blocknum):
+        num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
+        if blocknum < num_segments-1:
+            size = self._segment_size
+        else:
+            size = self._data_size % self._segment_size
+            if size == 0:
+                size = self._segment_size
+        offset = self._offsets['data'] + blocknum * self._segment_size
+        return self._read(offset, size)
+
+    def _str2l(self, s):
+        """ split string (pulled from storage) into a list of blockids """
+        return [ s[i:i+HASH_SIZE]
+                 for i in range(0, len(s), HASH_SIZE) ]
+
+    def get_plaintext_hashes(self):
+        offset = self._offsets['plaintext_hash_tree']
+        size = self._offsets['crypttext_hash_tree'] - offset
+        d = self._read(offset, size)
+        d.addCallback(self._str2l)
+        return d
+
+    def get_crypttext_hashes(self):
+        offset = self._offsets['crypttext_hash_tree']
+        size = self._offsets['block_hashes'] - offset
+        d = self._read(offset, size)
+        d.addCallback(self._str2l)
+        return d
+
+    def get_block_hashes(self):
+        offset = self._offsets['block_hashes']
+        size = self._offsets['share_hashes'] - offset
+        d = self._read(offset, size)
+        d.addCallback(self._str2l)
+        return d
+
+    def get_share_hashes(self):
+        offset = self._offsets['share_hashes']
+        size = self._offsets['uri_extension'] - offset
+        assert size % (2+HASH_SIZE) == 0
+        d = self._read(offset, size)
+        def _unpack_share_hashes(data):
+            assert len(data) == size
+            hashes = []
+            for i in range(0, size, 2+HASH_SIZE):
+                hashnum = struct.unpack(">H", data[i:i+2])[0]
+                hashvalue = data[i+2:i+2+HASH_SIZE]
+                hashes.append( (hashnum, hashvalue) )
+            return hashes
+        d.addCallback(_unpack_share_hashes)
+        return d
+
+    def get_uri_extension(self):
+        offset = self._offsets['uri_extension']
+        d = self._read(offset, 4)
+        def _got_length(data):
+            length = struct.unpack(">L", data)[0]
+            return self._read(offset+4, length)
+        d.addCallback(_got_length)
+        return d
+
+    def _read(self, offset, length):
+        return self._rref.callRemote("read", offset, length)
diff --git a/src/allmydata/storageserver.py b/src/allmydata/storageserver.py
deleted file mode 100644 (file)
index b5acdee..0000000
+++ /dev/null
@@ -1,389 +0,0 @@
-import os, re, weakref, stat, struct
-
-from foolscap import Referenceable
-from twisted.application import service
-from twisted.internet import defer
-
-from zope.interface import implements
-from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
-     RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE
-from allmydata.util import fileutil, idlib, mathutil
-from allmydata.util.assertutil import precondition
-
-# store/
-# store/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/$STORAGEINDEX/$SHARENUM on success
-# store/$STORAGEINDEX
-# store/$STORAGEINDEX/$SHARENUM
-# store/$STORAGEINDEX/$SHARENUM/blocksize
-# store/$STORAGEINDEX/$SHARENUM/data
-# store/$STORAGEINDEX/$SHARENUM/blockhashes
-# store/$STORAGEINDEX/$SHARENUM/sharehashtree
-
-# $SHARENUM matches this regex:
-NUM_RE=re.compile("[0-9]*")
-
-class BucketWriter(Referenceable):
-    implements(RIBucketWriter)
-
-    def __init__(self, ss, incominghome, finalhome, size):
-        self.ss = ss
-        self.incominghome = incominghome
-        self.finalhome = finalhome
-        self._size = size
-        self.closed = False
-        # touch the file, so later callers will see that we're working on it
-        f = open(self.incominghome, 'ab')
-        f.close()
-
-    def allocated_size(self):
-        return self._size
-
-    def remote_write(self, offset, data):
-        precondition(not self.closed)
-        precondition(offset >= 0)
-        precondition(offset+len(data) <= self._size)
-        f = open(self.incominghome, 'ab')
-        f.seek(offset)
-        f.write(data)
-        f.close()
-
-    def remote_close(self):
-        precondition(not self.closed)
-        fileutil.rename(self.incominghome, self.finalhome)
-        self.closed = True
-        filelen = os.stat(self.finalhome)[stat.ST_SIZE]
-        self.ss.bucket_writer_closed(self, filelen)
-
-
-class BucketReader(Referenceable):
-    implements(RIBucketReader)
-
-    def __init__(self, home):
-        self.home = home
-
-    def remote_read(self, offset, length):
-        f = open(self.home, 'rb')
-        f.seek(offset)
-        return f.read(length)
-
-class StorageServer(service.MultiService, Referenceable):
-    implements(RIStorageServer)
-    name = 'storageserver'
-
-    def __init__(self, storedir, sizelimit=None):
-        service.MultiService.__init__(self)
-        fileutil.make_dirs(storedir)
-        self.storedir = storedir
-        self.sizelimit = sizelimit
-        self.incomingdir = os.path.join(storedir, 'incoming')
-        self._clean_incomplete()
-        fileutil.make_dirs(self.incomingdir)
-        self._active_writers = weakref.WeakKeyDictionary()
-
-        self.measure_size()
-
-    def _clean_incomplete(self):
-        fileutil.rm_dir(self.incomingdir)
-
-    def measure_size(self):
-        self.consumed = fileutil.du(self.storedir)
-
-    def allocated_size(self):
-        space = self.consumed
-        for bw in self._active_writers:
-            space += bw.allocated_size()
-        return space
-
-    def remote_allocate_buckets(self, storage_index, sharenums, allocated_size,
-                                canary):
-        alreadygot = set()
-        bucketwriters = {} # k: shnum, v: BucketWriter
-        si_s = idlib.b2a(storage_index)
-        space_per_bucket = allocated_size
-        no_limits = self.sizelimit is None
-        yes_limits = not no_limits
-        if yes_limits:
-            remaining_space = self.sizelimit - self.allocated_size()
-        for shnum in sharenums:
-            incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum)
-            finalhome = os.path.join(self.storedir, si_s, "%d" % shnum)
-            if os.path.exists(incominghome) or os.path.exists(finalhome):
-                alreadygot.add(shnum)
-            elif no_limits or remaining_space >= space_per_bucket:
-                fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
-                bw = BucketWriter(self, incominghome, finalhome,
-                                  space_per_bucket)
-                bucketwriters[shnum] = bw
-                self._active_writers[bw] = 1
-                if yes_limits:
-                    remaining_space -= space_per_bucket
-            else:
-                # not enough space to accept this bucket
-                pass
-
-        if bucketwriters:
-            fileutil.make_dirs(os.path.join(self.storedir, si_s))
-
-        return alreadygot, bucketwriters
-
-    def bucket_writer_closed(self, bw, consumed_size):
-        self.consumed += consumed_size
-        del self._active_writers[bw]
-
-    def remote_get_buckets(self, storage_index):
-        bucketreaders = {} # k: sharenum, v: BucketReader
-        storagedir = os.path.join(self.storedir, idlib.b2a(storage_index))
-        try:
-            for f in os.listdir(storagedir):
-                if NUM_RE.match(f):
-                    br = BucketReader(os.path.join(storagedir, f))
-                    bucketreaders[int(f)] = br
-        except OSError:
-            # Commonly caused by there being no buckets at all.
-            pass
-
-        return bucketreaders
-
-"""
-Share data is written into a single file. At the start of the file, there is
-a series of four-byte big-endian offset values, which indicate where each
-section starts. Each offset is measured from the beginning of the file.
-
-0x00: segment size
-0x04: data size
-0x08: offset of data (=00 00 00 1c)
-0x0c: offset of plaintext_hash_tree
-0x10: offset of crypttext_hash_tree
-0x14: offset of block_hashes
-0x18: offset of share_hashes
-0x1c: offset of uri_extension_length + uri_extension
-0x20: start of data
-?   : start of plaintext_hash_tree
-?   : start of crypttext_hash_tree
-?   : start of block_hashes
-?   : start of share_hashes
-       each share_hash is written as a two-byte (big-endian) hashnum
-       followed by the 32-byte SHA-256 hash. We only store the hashes
-       necessary to validate the share hash root
-?   : start of uri_extension_length (four-byte big-endian value)
-?   : start of uri_extension
-"""
-
-def allocated_size(data_size, num_segments, num_share_hashes,
-                   uri_extension_size):
-    wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
-                           uri_extension_size)
-    uri_extension_starts_at = wbp._offsets['uri_extension']
-    return uri_extension_starts_at + 4 + uri_extension_size
-
-class WriteBucketProxy:
-    implements(IStorageBucketWriter)
-    def __init__(self, rref, data_size, segment_size, num_segments,
-                 num_share_hashes, uri_extension_size):
-        self._rref = rref
-        self._data_size = data_size
-        self._segment_size = segment_size
-        self._num_segments = num_segments
-
-        self._segment_hash_size = (2*num_segments - 1) * HASH_SIZE
-        # how many share hashes are included in each share? This will be
-        # about ln2(num_shares).
-        self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
-        # we commit to not sending a uri extension larger than this
-        self._uri_extension_size = uri_extension_size
-
-        offsets = self._offsets = {}
-        x = 0x20
-        offsets['data'] = x
-        x += data_size
-        offsets['plaintext_hash_tree'] = x
-        x += self._segment_hash_size
-        offsets['crypttext_hash_tree'] = x
-        x += self._segment_hash_size
-        offsets['block_hashes'] = x
-        x += self._segment_hash_size
-        offsets['share_hashes'] = x
-        x += self._share_hash_size
-        offsets['uri_extension'] = x
-
-        offset_data = struct.pack(">LLLLLLLL",
-                                  segment_size,
-                                  data_size,
-                                  offsets['data'],
-                                  offsets['plaintext_hash_tree'],
-                                  offsets['crypttext_hash_tree'],
-                                  offsets['block_hashes'],
-                                  offsets['share_hashes'],
-                                  offsets['uri_extension'],
-                                  )
-        assert len(offset_data) == 8*4
-        self._offset_data = offset_data
-
-    def start(self):
-        return self._write(0, self._offset_data)
-
-    def put_block(self, segmentnum, data):
-        offset = self._offsets['data'] + segmentnum * self._segment_size
-        assert offset + len(data) <= self._offsets['uri_extension']
-        assert isinstance(data, str)
-        if segmentnum < self._num_segments-1:
-            precondition(len(data) == self._segment_size,
-                         len(data), self._segment_size)
-        else:
-            precondition(len(data) == (self._data_size -
-                                       (self._segment_size *
-                                        (self._num_segments - 1))),
-                         len(data), self._segment_size)
-        return self._write(offset, data)
-
-    def put_plaintext_hashes(self, hashes):
-        offset = self._offsets['plaintext_hash_tree']
-        assert isinstance(hashes, list)
-        data = "".join(hashes)
-        assert len(data) == self._segment_hash_size
-        assert offset + len(data) <= self._offsets['crypttext_hash_tree']
-        return self._write(offset, data)
-
-    def put_crypttext_hashes(self, hashes):
-        offset = self._offsets['crypttext_hash_tree']
-        assert isinstance(hashes, list)
-        data = "".join(hashes)
-        assert len(data) == self._segment_hash_size
-        assert offset + len(data) <= self._offsets['block_hashes']
-        return self._write(offset, data)
-
-    def put_block_hashes(self, blockhashes):
-        offset = self._offsets['block_hashes']
-        assert isinstance(blockhashes, list)
-        data = "".join(blockhashes)
-        assert len(data) == self._segment_hash_size
-        assert offset + len(data) <= self._offsets['share_hashes']
-        return self._write(offset, data)
-
-    def put_share_hashes(self, sharehashes):
-        # sharehashes is a list of (index, hash) tuples, so they get stored
-        # as 2+32=34 bytes each
-        offset = self._offsets['share_hashes']
-        assert isinstance(sharehashes, list)
-        data = "".join([struct.pack(">H", hashnum) + hashvalue
-                        for hashnum,hashvalue in sharehashes])
-        precondition(len(data) == self._share_hash_size,
-                     len(data), self._share_hash_size)
-        assert offset + len(data) <= self._offsets['uri_extension']
-        return self._write(offset, data)
-
-    def put_uri_extension(self, data):
-        offset = self._offsets['uri_extension']
-        assert isinstance(data, str)
-        assert len(data) <= self._uri_extension_size
-        length = struct.pack(">L", len(data))
-        return self._write(offset, length+data)
-
-    def _write(self, offset, data):
-        # TODO: for small shares, buffer the writes and do just a single call
-        return self._rref.callRemote("write", offset, data)
-
-    def close(self):
-        return self._rref.callRemote("close")
-
-class ReadBucketProxy:
-    implements(IStorageBucketReader)
-    def __init__(self, rref):
-        self._rref = rref
-        self._started = False
-
-    def startIfNecessary(self):
-        if self._started:
-            return defer.succeed(self)
-        d = self.start()
-        d.addCallback(lambda res: self)
-        return d
-
-    def start(self):
-        # TODO: for small shares, read the whole bucket in start()
-        d = self._read(0, 8*4)
-        d.addCallback(self._parse_offsets)
-        return d
-
-    def _parse_offsets(self, data):
-        precondition(len(data) == 8*4)
-        self._offsets = {}
-        self._segment_size = struct.unpack(">L", data[0:4])[0]
-        self._data_size = struct.unpack(">L", data[4:8])[0]
-        x = 0x08
-        for field in ( 'data',
-                       'plaintext_hash_tree',
-                       'crypttext_hash_tree',
-                       'block_hashes',
-                       'share_hashes',
-                       'uri_extension',
-                       ):
-            offset = struct.unpack(">L", data[x:x+4])[0]
-            x += 4
-            self._offsets[field] = offset
-        return self._offsets
-
-    def get_block(self, blocknum):
-        num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
-        if blocknum < num_segments-1:
-            size = self._segment_size
-        else:
-            size = self._data_size % self._segment_size
-            if size == 0:
-                size = self._segment_size
-        offset = self._offsets['data'] + blocknum * self._segment_size
-        return self._read(offset, size)
-
-    def _str2l(self, s):
-        """ split string (pulled from storage) into a list of blockids """
-        return [ s[i:i+HASH_SIZE]
-                 for i in range(0, len(s), HASH_SIZE) ]
-
-    def get_plaintext_hashes(self):
-        offset = self._offsets['plaintext_hash_tree']
-        size = self._offsets['crypttext_hash_tree'] - offset
-        d = self._read(offset, size)
-        d.addCallback(self._str2l)
-        return d
-
-    def get_crypttext_hashes(self):
-        offset = self._offsets['crypttext_hash_tree']
-        size = self._offsets['block_hashes'] - offset
-        d = self._read(offset, size)
-        d.addCallback(self._str2l)
-        return d
-
-    def get_block_hashes(self):
-        offset = self._offsets['block_hashes']
-        size = self._offsets['share_hashes'] - offset
-        d = self._read(offset, size)
-        d.addCallback(self._str2l)
-        return d
-
-    def get_share_hashes(self):
-        offset = self._offsets['share_hashes']
-        size = self._offsets['uri_extension'] - offset
-        assert size % (2+HASH_SIZE) == 0
-        d = self._read(offset, size)
-        def _unpack_share_hashes(data):
-            assert len(data) == size
-            hashes = []
-            for i in range(0, size, 2+HASH_SIZE):
-                hashnum = struct.unpack(">H", data[i:i+2])[0]
-                hashvalue = data[i+2:i+2+HASH_SIZE]
-                hashes.append( (hashnum, hashvalue) )
-            return hashes
-        d.addCallback(_unpack_share_hashes)
-        return d
-
-    def get_uri_extension(self):
-        offset = self._offsets['uri_extension']
-        d = self._read(offset, 4)
-        def _got_length(data):
-            length = struct.unpack(">L", data)[0]
-            return self._read(offset+4, length)
-        d.addCallback(_got_length)
-        return d
-
-    def _read(self, offset, length):
-        return self._rref.callRemote("read", offset, length)
index 01e282a8ca96a88f2bfb82a8a8cbecec6a59c094..d58b064bbdbe210afaa9021fb0d0290e816a8297 100644 (file)
@@ -7,7 +7,7 @@ from foolscap import Referenceable
 import os.path
 from allmydata import interfaces
 from allmydata.util import fileutil, hashutil
-from allmydata.storageserver import BucketWriter, BucketReader, \
+from allmydata.storage import BucketWriter, BucketReader, \
      WriteBucketProxy, ReadBucketProxy, StorageServer
 
 
index 404b86de025f8613c0fa21e38b6afc071ae29bec..bebb46496eb5b16d788872aa355265217f5453bb 100644 (file)
@@ -5,7 +5,7 @@ from twisted.application import service
 from foolscap import Referenceable
 
 from allmydata.util import idlib, hashutil
-from allmydata import encode, storageserver, hashtree
+from allmydata import encode, storage, hashtree
 from allmydata.uri import pack_uri, pack_lit
 from allmydata.interfaces import IUploadable, IUploader
 from allmydata.Crypto.Cipher import AES
@@ -39,10 +39,10 @@ class PeerTracker:
         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
         self.sharesize = sharesize
         #print "PeerTracker", peerid, permutedid, sharesize
-        as = storageserver.allocated_size(sharesize,
-                                          num_segments,
-                                          num_share_hashes,
-                                          EXTENSION_SIZE)
+        as = storage.allocated_size(sharesize,
+                                    num_segments,
+                                    num_share_hashes,
+                                    EXTENSION_SIZE)
         self.allocated_size = as
                                                            
         self.blocksize = blocksize
@@ -74,11 +74,11 @@ class PeerTracker:
         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
         b = {}
         for sharenum, rref in buckets.iteritems():
-            bp = storageserver.WriteBucketProxy(rref, self.sharesize,
-                                                self.blocksize,
-                                                self.num_segments,
-                                                self.num_share_hashes,
-                                                EXTENSION_SIZE)
+            bp = storage.WriteBucketProxy(rref, self.sharesize,
+                                          self.blocksize,
+                                          self.num_segments,
+                                          self.num_share_hashes,
+                                          EXTENSION_SIZE)
             b[sharenum] = bp
         self.buckets.update(b)
         return (alreadygot, set(b.keys()))