]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
more #85 work, system test still fails
authorBrian Warner <warner@lothar.com>
Fri, 13 Jul 2007 22:09:01 +0000 (15:09 -0700)
committerBrian Warner <warner@lothar.com>
Fri, 13 Jul 2007 22:09:01 +0000 (15:09 -0700)
src/allmydata/encode.py
src/allmydata/interfaces.py
src/allmydata/storageserver.py
src/allmydata/test/test_encode.py
src/allmydata/test/test_storage.py
src/allmydata/test/test_upload.py
src/allmydata/upload.py

index a36b71a6e71b4057e4c8b5bae67b3c1f9c6ae144..f3af448b9c42d104dd796b70154d0fd5715e19ad 100644 (file)
@@ -162,10 +162,7 @@ class Encoder(object):
     def set_shareholders(self, landlords):
         assert isinstance(landlords, dict)
         for k in landlords:
-            # it would be nice to:
-            #assert RIBucketWriter.providedBy(landlords[k])
-            assert IStorageBucketWriter(landlords[k])
-            pass
+            assert IStorageBucketWriter.providedBy(landlords[k])
         self.landlords = landlords.copy()
 
     def start(self):
index 9a1f21453e4ff67ed6a07c8533adcffd778b6fa0..979a4e32c45b11b4c6aab3c2bac40ccffd6a3653 100644 (file)
@@ -80,7 +80,7 @@ class RIBucketReader(RemoteInterface):
 class RIStorageServer(RemoteInterface):
     def allocate_buckets(storage_index=StorageIndex,
                          sharenums=SetOf(int, maxLength=MAX_BUCKETS),
-                         sharesize=int, blocksize=int, canary=Referenceable):
+                         allocated_size=int, canary=Referenceable):
         """
         @param canary: If the canary is lost before close(), the bucket is deleted.
         @return: tuple of (alreadygot, allocated), where alreadygot is what we
index 4cc53e139f6ea2b5e09bfb82a51aa575fd7a45f3..74b4c894b5853fa2256f05a6895ee675c46683a5 100644 (file)
@@ -95,12 +95,12 @@ class StorageServer(service.MultiService, Referenceable):
             space += bw.allocated_size()
         return space
 
-    def remote_allocate_buckets(self, storage_index, sharenums, sharesize,
+    def remote_allocate_buckets(self, storage_index, sharenums, allocated_size,
                                 canary):
         alreadygot = set()
         bucketwriters = {} # k: shnum, v: BucketWriter
         si_s = idlib.b2a(storage_index)
-        space_per_bucket = sharesize
+        space_per_bucket = allocated_size
         no_limits = self.sizelimit is None
         yes_limits = not no_limits
         if yes_limits:
@@ -169,18 +169,28 @@ section starts. Each offset is measured from the beginning of the file.
       start of uri_extension
 """
 
+def allocated_size(data_size, num_segments, num_share_hashes,
+                   uri_extension_size):
+    wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
+                           uri_extension_size)
+    uri_extension_starts_at = wbp._offsets['uri_extension']
+    return uri_extension_starts_at + 4 + uri_extension_size
+
 class WriteBucketProxy:
     implements(IStorageBucketWriter)
     def __init__(self, rref, data_size, segment_size, num_segments,
-                 num_share_hashes):
+                 num_share_hashes, uri_extension_size):
         self._rref = rref
         self._segment_size = segment_size
+        self._num_segments = num_segments
 
         HASH_SIZE = interfaces.HASH_SIZE
         self._segment_hash_size = (2*num_segments - 1) * HASH_SIZE
         # how many share hashes are included in each share? This will be
         # about ln2(num_shares).
         self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
+        # we commit to not sending a uri extension larger than this
+        self._uri_extension_size = uri_extension_size
 
         offsets = self._offsets = {}
         x = 0x1c
@@ -215,10 +225,12 @@ class WriteBucketProxy:
         offset = self._offsets['data'] + segmentnum * self._segment_size
         assert offset + len(data) <= self._offsets['uri_extension']
         assert isinstance(data, str)
-        if segmentnum < self._segment_size-1:
-            assert len(data) == self._segment_size
+        if segmentnum < self._num_segments-1:
+            precondition(len(data) == self._segment_size,
+                         len(data), self._segment_size)
         else:
-            assert len(data) <= self._segment_size
+            precondition(len(data) <= self._segment_size,
+                         len(data), self._segment_size)
         return self._write(offset, data)
 
     def put_plaintext_hashes(self, hashes):
@@ -252,13 +264,15 @@ class WriteBucketProxy:
         assert isinstance(sharehashes, list)
         data = "".join([struct.pack(">H", hashnum) + hashvalue
                         for hashnum,hashvalue in sharehashes])
-        assert len(data) == self._share_hash_size
+        precondition(len(data) == self._share_hash_size,
+                     len(data), self._share_hash_size)
         assert offset + len(data) <= self._offsets['uri_extension']
         return self._write(offset, data)
 
     def put_uri_extension(self, data):
         offset = self._offsets['uri_extension']
         assert isinstance(data, str)
+        assert len(data) <= self._uri_extension_size
         length = struct.pack(">L", len(data))
         return self._write(offset, length+data)
 
@@ -273,6 +287,7 @@ class ReadBucketProxy:
     implements(IStorageBucketReader)
     def __init__(self, rref):
         self._rref = rref
+        self._started = False
 
     def startIfNecessary(self):
         if self._started:
index d4a42d0fb5992720543e6a9b60a60599add2b657..a70bfd6ecc8aee9b771a001f98116f7735cbf1b8 100644 (file)
@@ -3,7 +3,6 @@ from zope.interface import implements
 from twisted.trial import unittest
 from twisted.internet import defer
 from twisted.python.failure import Failure
-from foolscap import eventual
 from allmydata import encode, download, hashtree
 from allmydata.util import hashutil
 from allmydata.uri import pack_uri
@@ -11,45 +10,13 @@ from allmydata.Crypto.Cipher import AES
 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader
 from cStringIO import StringIO
 
-class FakePeer:
-    def __init__(self, mode="good"):
-        self.ss = FakeStorageServer(mode)
-
-    def callRemote(self, methname, *args, **kwargs):
-        def _call():
-            meth = getattr(self, methname)
-            return meth(*args, **kwargs)
-        return defer.maybeDeferred(_call)
-
-    def get_service(self, sname):
-        assert sname == "storageserver"
-        return self.ss
-
-class FakeStorageServer:
-    def __init__(self, mode):
-        self.mode = mode
-    def callRemote(self, methname, *args, **kwargs):
-        def _call():
-            meth = getattr(self, methname)
-            return meth(*args, **kwargs)
-        d = eventual.fireEventually()
-        d.addCallback(lambda res: _call())
-        return d
-    def allocate_buckets(self, crypttext_hash, sharenums, shareize, blocksize, canary):
-        if self.mode == "full":
-            return (set(), {},)
-        elif self.mode == "already got them":
-            return (set(sharenums), {},)
-        else:
-            return (set(), dict([(shnum, FakeBucketWriter(),) for shnum in sharenums]),)
-
 class LostPeerError(Exception):
     pass
 
 def flip_bit(good): # flips the last bit
     return good[:-1] + chr(ord(good[-1]) ^ 0x01)
 
-class FakeBucketWriter:
+class FakeBucketWriterProxy:
     implements(IStorageBucketWriter, IStorageBucketReader)
     # these are used for both reading and writing
     def __init__(self, mode="good"):
@@ -195,7 +162,7 @@ class Encode(unittest.TestCase):
         shareholders = {}
         all_shareholders = []
         for shnum in range(NUM_SHARES):
-            peer = FakeBucketWriter()
+            peer = FakeBucketWriterProxy()
             shareholders[shnum] = peer
             all_shareholders.append(peer)
         e.set_shareholders(shareholders)
@@ -322,7 +289,7 @@ class Roundtrip(unittest.TestCase):
         all_peers = []
         for shnum in range(NUM_SHARES):
             mode = bucket_modes.get(shnum, "good")
-            peer = FakeBucketWriter(mode)
+            peer = FakeBucketWriterProxy(mode)
             shareholders[shnum] = peer
         e.set_shareholders(shareholders)
         plaintext_hasher = hashutil.plaintext_hasher()
index 9f76d70ff340563920b983282f82054f11288563..7d6afd42b255a168ede3ac8de331a8e45143b6d6 100644 (file)
@@ -5,8 +5,10 @@ from twisted.application import service
 from twisted.internet import defer
 from foolscap import Referenceable
 import os.path
-from allmydata import storageserver, interfaces
+from allmydata import interfaces
 from allmydata.util import fileutil, hashutil
+from allmydata.storageserver import BucketWriter, BucketReader, \
+     WriteBucketProxy, ReadBucketProxy, StorageServer
 
 
 class Bucket(unittest.TestCase):
@@ -23,7 +25,7 @@ class Bucket(unittest.TestCase):
 
     def test_create(self):
         incoming, final = self.make_workdir("test_create")
-        bw = storageserver.BucketWriter(self, incoming, final, 200)
+        bw = BucketWriter(self, incoming, final, 200)
         bw.remote_write(0, "a"*25)
         bw.remote_write(25, "b"*25)
         bw.remote_write(50, "c"*25)
@@ -32,14 +34,14 @@ class Bucket(unittest.TestCase):
 
     def test_readwrite(self):
         incoming, final = self.make_workdir("test_readwrite")
-        bw = storageserver.BucketWriter(self, incoming, final, 200)
+        bw = BucketWriter(self, incoming, final, 200)
         bw.remote_write(0, "a"*25)
         bw.remote_write(25, "b"*25)
         bw.remote_write(50, "c"*7) # last block may be short
         bw.remote_close()
 
         # now read from it
-        br = storageserver.BucketReader(final)
+        br = BucketReader(final)
         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
@@ -59,7 +61,7 @@ class BucketProxy(unittest.TestCase):
         final = os.path.join(basedir, "bucket")
         fileutil.make_dirs(basedir)
         fileutil.make_dirs(os.path.join(basedir, "tmp"))
-        bw = storageserver.BucketWriter(self, incoming, final, size)
+        bw = BucketWriter(self, incoming, final, size)
         rb = RemoteBucket()
         rb.target = bw
         return bw, rb, final
@@ -69,11 +71,12 @@ class BucketProxy(unittest.TestCase):
 
     def test_create(self):
         bw, rb, final = self.make_bucket("test_create", 500)
-        bp = storageserver.WriteBucketProxy(rb,
-                                            data_size=300,
-                                            segment_size=10,
-                                            num_segments=5,
-                                            num_share_hashes=3)
+        bp = WriteBucketProxy(rb,
+                              data_size=300,
+                              segment_size=10,
+                              num_segments=5,
+                              num_share_hashes=3,
+                              uri_extension_size=500)
         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
 
     def test_readwrite(self):
@@ -98,11 +101,12 @@ class BucketProxy(unittest.TestCase):
         uri_extension = "s" + "E"*498 + "e"
 
         bw, rb, final = self.make_bucket("test_readwrite", 1406)
-        bp = storageserver.WriteBucketProxy(rb,
-                                            data_size=100,
-                                            segment_size=25,
-                                            num_segments=4,
-                                            num_share_hashes=3)
+        bp = WriteBucketProxy(rb,
+                              data_size=100,
+                              segment_size=25,
+                              num_segments=4,
+                              num_share_hashes=3,
+                              uri_extension_size=len(uri_extension))
 
         d = bp.start()
         d.addCallback(lambda res: bp.put_block(0, "a"*25))
@@ -118,13 +122,13 @@ class BucketProxy(unittest.TestCase):
 
         # now read everything back
         def _start_reading(res):
-            br = storageserver.BucketReader(final)
+            br = BucketReader(final)
             rb = RemoteBucket()
             rb.target = br
-            rbp = storageserver.ReadBucketProxy(rb)
+            rbp = ReadBucketProxy(rb)
             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
 
-            d1 = rbp.start()
+            d1 = rbp.startIfNecessary()
             d1.addCallback(lambda res: rbp.get_block(0))
             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
             d1.addCallback(lambda res: rbp.get_block(1))
@@ -169,7 +173,7 @@ class Server(unittest.TestCase):
 
     def create(self, name, sizelimit=None):
         workdir = self.workdir(name)
-        ss = storageserver.StorageServer(workdir, sizelimit)
+        ss = StorageServer(workdir, sizelimit)
         ss.setServiceParent(self.sparent)
         return ss
 
index 6b508513a1f979b0108e5a2362ef4eab9599d231..92a087632fd32402813b0d85e27af9a2d43287a8 100644 (file)
@@ -1,12 +1,79 @@
 
 from twisted.trial import unittest
 from twisted.python.failure import Failure
+from twisted.internet import defer
 from cStringIO import StringIO
 
-from allmydata import upload, encode
+from allmydata import upload, encode, storageserver
 from allmydata.uri import unpack_uri, unpack_lit
+from allmydata.util.assertutil import precondition
+from foolscap import eventual
 
-from test_encode import FakePeer
+class FakePeer:
+    def __init__(self, mode="good"):
+        self.ss = FakeStorageServer(mode)
+
+    def callRemote(self, methname, *args, **kwargs):
+        def _call():
+            meth = getattr(self, methname)
+            return meth(*args, **kwargs)
+        return defer.maybeDeferred(_call)
+
+    def get_service(self, sname):
+        assert sname == "storageserver"
+        return self.ss
+
+class FakeStorageServer:
+    def __init__(self, mode):
+        self.mode = mode
+    def callRemote(self, methname, *args, **kwargs):
+        def _call():
+            meth = getattr(self, methname)
+            return meth(*args, **kwargs)
+        d = eventual.fireEventually()
+        d.addCallback(lambda res: _call())
+        return d
+
+    def allocate_buckets(self, crypttext_hash, sharenums,
+                         share_size, blocksize, canary):
+        #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
+        if self.mode == "full":
+            return (set(), {},)
+        elif self.mode == "already got them":
+            return (set(sharenums), {},)
+        else:
+            return (set(),
+                    dict([( shnum, FakeBucketWriter(share_size) )
+                          for shnum in sharenums]),
+                    )
+
+class FakeBucketWriter:
+    # a diagnostic version of storageserver.BucketWriter
+    def __init__(self, size):
+        self.data = StringIO()
+        self.closed = False
+        self._size = size
+
+    def callRemote(self, methname, *args, **kwargs):
+        def _call():
+            meth = getattr(self, "remote_" + methname)
+            return meth(*args, **kwargs)
+        d = eventual.fireEventually()
+        d.addCallback(lambda res: _call())
+        return d
+
+    def remote_write(self, offset, data):
+        precondition(not self.closed)
+        precondition(offset >= 0)
+        precondition(offset+len(data) <= self._size,
+                     "offset=%d + data=%d > size=%d" %
+                     (offset, len(data), self._size))
+        self.data.seek(offset)
+        self.data.write(data)
+
+    def remote_close(self):
+        precondition(not self.closed)
+        self.closed = True
 
 class FakeClient:
     def __init__(self, mode="good"):
index 5c327da4f27bfd10b32898985679272afaa4a564..404b86de025f8613c0fa21e38b6afc071ae29bec 100644 (file)
@@ -5,7 +5,7 @@ from twisted.application import service
 from foolscap import Referenceable
 
 from allmydata.util import idlib, hashutil
-from allmydata import encode, storageserver
+from allmydata import encode, storageserver, hashtree
 from allmydata.uri import pack_uri, pack_lit
 from allmydata.interfaces import IUploadable, IUploader
 from allmydata.Crypto.Cipher import AES
@@ -22,6 +22,13 @@ class HaveAllPeersError(Exception):
 class TooFullError(Exception):
     pass
 
+# our current uri_extension is 846 bytes for small files, a few bytes
+# more for larger ones (since the filesize is encoded in decimal in a
+# few places). Ask for a little bit more just in case we need it. If
+# the extension changes size, we can change EXTENSION_SIZE to
+# allocate a more accurate amount of space.
+EXTENSION_SIZE = 1000
+
 class PeerTracker:
     def __init__(self, peerid, permutedid, connection,
                  sharesize, blocksize, num_segments, num_share_hashes,
@@ -31,6 +38,13 @@ class PeerTracker:
         self.connection = connection # to an RIClient
         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
         self.sharesize = sharesize
+        #print "PeerTracker", peerid, permutedid, sharesize
+        as = storageserver.allocated_size(sharesize,
+                                          num_segments,
+                                          num_share_hashes,
+                                          EXTENSION_SIZE)
+        self.allocated_size = as
+                                                           
         self.blocksize = blocksize
         self.num_segments = num_segments
         self.num_share_hashes = num_share_hashes
@@ -47,10 +61,11 @@ class PeerTracker:
     def _got_storageserver(self, storageserver):
         self._storageserver = storageserver
     def _query(self, sharenums):
+        #print " query", self.peerid, len(sharenums)
         d = self._storageserver.callRemote("allocate_buckets",
                                            self.crypttext_hash,
-                                           sharenums, self.sharesize,
-                                           self.blocksize,
+                                           sharenums,
+                                           self.allocated_size,
                                            canary=Referenceable())
         d.addCallback(self._got_reply)
         return d
@@ -62,7 +77,8 @@ class PeerTracker:
             bp = storageserver.WriteBucketProxy(rref, self.sharesize,
                                                 self.blocksize,
                                                 self.num_segments,
-                                                self.num_share_hashes)
+                                                self.num_share_hashes,
+                                                EXTENSION_SIZE)
             b[sharenum] = bp
         self.buckets.update(b)
         return (alreadygot, set(b.keys()))
@@ -137,11 +153,16 @@ class FileUploader:
         # responsible for handling the data and sending out the shares.
         peers = self._client.get_permuted_peers(self._crypttext_hash)
         assert peers
+
         # TODO: eek, don't pull this from here, find a better way. gross.
         num_segments = self._encoder.uri_extension_data['num_segments']
-        from allmydata.util.mathutil import next_power_of_k
-        import math
-        num_share_hashes = max(int(math.log(next_power_of_k(self.total_shares,2),2)),1)
+        ht = hashtree.IncompleteHashTree(self.total_shares)
+        # this needed_hashes computation should mirror
+        # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
+        # (instead of a HashTree) because we don't require actual hashing
+        # just to count the levels.
+        num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
+
         trackers = [ PeerTracker(peerid, permutedid, conn,
                                  share_size, block_size,
                                  num_segments, num_share_hashes,
@@ -217,10 +238,11 @@ class FileUploader:
             if ring[0][1] == SHARE:
                 sharenums_to_query.add(ring[0][2])
             else:
-                d = peer.query(sharenums_to_query)
-                d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,))
-                outstanding_queries.append(d)
-                d.addErrback(log.err)
+                if True or sharenums_to_query:
+                    d = peer.query(sharenums_to_query)
+                    d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,))
+                    outstanding_queries.append(d)
+                    d.addErrback(log.err)
                 peer = ring[0][2]
                 sharenums_to_query = set()
             ring.rotate(-1)