]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
allow the introducer to set default encoding parameters. Closes #84.
authorBrian Warner <warner@allmydata.com>
Thu, 12 Jul 2007 22:33:30 +0000 (15:33 -0700)
committerBrian Warner <warner@allmydata.com>
Thu, 12 Jul 2007 22:33:30 +0000 (15:33 -0700)
By writing something like "25 75 100" into a file named 'encoding_parameters'
in the central Introducer's base directory, all clients which use that
introducer will be advised to use 25-out-of-100 encoding for files (i.e.
100 shares will be produced, 25 are required to reconstruct, and the upload
process will be happy if it can find homes for at least 75 shares). The
default values are "3 7 10". For small meshes, the defaults are probably
good, but for larger ones it may be appropriate to increase the number of
shares.

src/allmydata/client.py
src/allmydata/codec.py
src/allmydata/encode.py
src/allmydata/interfaces.py
src/allmydata/introducer.py
src/allmydata/introducer_and_vdrive.py
src/allmydata/test/test_introducer_and_vdrive.py
src/allmydata/test/test_system.py
src/allmydata/test/test_upload.py
src/allmydata/upload.py

index fc7b092b08d9e5b6ea1f39fa749797e06dac7a6e..21c66f3aef0fe47fa2a1576f7a19897919f753f1 100644 (file)
@@ -150,6 +150,11 @@ class Client(node.Node, Referenceable):
         results.sort()
         return results
 
+    def get_encoding_parameters(self):
+        if not self.introducer_client:
+            return None
+        return self.introducer_client.encoding_parameters
+
     def connected_to_introducer(self):
         if self.introducer_client:
             return self.introducer_client.connected_to_introducer()
index 324e6a18d1a8ab52926ff23c99cc2646c45397e9..6ef84a34b8af21c3f96da6cf993ab8dd1e395327 100644 (file)
@@ -113,9 +113,13 @@ class CRSDecoder(object):
         return self.required_shares
 
     def decode(self, some_shares, their_shareids):
-        precondition(len(some_shares) == len(their_shareids), len(some_shares), len(their_shareids))
-        precondition(len(some_shares) == self.required_shares, len(some_shares), self.required_shares)
-        return defer.succeed(self.decoder.decode(some_shares, [int(s) for s in their_shareids]))
+        precondition(len(some_shares) == len(their_shareids),
+                     len(some_shares), len(their_shareids))
+        precondition(len(some_shares) == self.required_shares,
+                     len(some_shares), self.required_shares)
+        data = self.decoder.decode(some_shares,
+                                   [int(s) for s in their_shareids])
+        return defer.succeed(data)
 
 
 all_encoders = {
index feb78992f618c5c1a7b2cdbffb16231a71c70362..830b5b63b590ceea18da5ecb8d3664f67b12f597 100644 (file)
@@ -88,6 +88,12 @@ class Encoder(object):
         self.TOTAL_SHARES = n
         self.uri_extension_data = {}
 
+    def set_params(self, encoding_parameters):
+        k,d,n = encoding_parameters
+        self.NEEDED_SHARES = k
+        self.SHARES_OF_HAPPINESS = d
+        self.TOTAL_SHARES = n
+
     def setup(self, infile, encryption_key):
         self.infile = infile
         assert isinstance(encryption_key, str)
index 1f95fa829c420daecc0c1f9adea4374786298ef9..8d0afac2599b1ed89df11124de600b313f8cc6b7 100644 (file)
@@ -20,6 +20,22 @@ URIExtensionData = StringConstraint(1000)
 class RIIntroducerClient(RemoteInterface):
     def new_peers(furls=SetOf(FURL)):
         return None
+    def set_encoding_parameters(parameters=(int, int, int)):
+        """Advise the client of the recommended k-of-n encoding parameters
+        for this grid. 'parameters' is a tuple of (k, desired, n), where 'n'
+        is the total number of shares that will be created for any given
+        file, while 'k' is the number of shares that must be retrieved to
+        recover that file, and 'desired' is the minimum number of shares that
+        must be placed before the uploader will consider its job a success.
+        n/k is the expansion ratio, while k determines the robustness.
+
+        Introducers should specify 'n' according to the expected size of the
+        grid (there is no point to producing more shares than there are
+        peers), and k according to the desired reliability-vs-overhead goals.
+
+        Note that the current encoding technology requires k>=2.
+        """
+        return None
 
 class RIIntroducer(RemoteInterface):
     def hello(node=RIIntroducerClient, furl=FURL):
index ccda1e71b04412d1216cfaf7c557739308d1dffe..668be165b9b79ce6fc19d22eed5101f76a6b5ce4 100644 (file)
@@ -9,11 +9,16 @@ from allmydata.util import idlib, observer
 
 class Introducer(service.MultiService, Referenceable):
     implements(RIIntroducer)
+    name = "introducer"
 
     def __init__(self):
         service.MultiService.__init__(self)
         self.nodes = set()
         self.furls = set()
+        self._encoding_parameters = None
+
+    def set_encoding_parameters(self, parameters):
+        self._encoding_parameters = parameters
 
     def remote_hello(self, node, furl):
         log.msg("introducer: new contact at %s, node is %s" % (furl, node))
@@ -24,6 +29,9 @@ class Introducer(service.MultiService, Referenceable):
         node.notifyOnDisconnect(_remove)
         self.furls.add(furl)
         node.callRemote("new_peers", self.furls)
+        if self._encoding_parameters is not None:
+            node.callRemote("set_encoding_parameters",
+                            self._encoding_parameters)
         for othernode in self.nodes:
             othernode.callRemote("new_peers", set([furl]))
         self.nodes.add(node)
@@ -42,6 +50,7 @@ class IntroducerClient(service.Service, Referenceable):
         self._connected = False
 
         self.connection_observers = observer.ObserverList()
+        self.encoding_parameters = None
 
     def startService(self):
         service.Service.startService(self)
@@ -60,6 +69,9 @@ class IntroducerClient(service.Service, Referenceable):
         for furl in furls:
             self._new_peer(furl)
 
+    def remote_set_encoding_parameters(self, parameters):
+        self.encoding_parameters = parameters
+
     def stopService(self):
         service.Service.stopService(self)
         self.introducer_reconnector.stopConnecting()
index 65995be6ae470d90f815da6137e8f445ef3650ea..27e83835183bed5c0d654de34e6d2fa66cc6df3e 100644 (file)
@@ -9,13 +9,17 @@ class IntroducerAndVdrive(node.Node):
     PORTNUMFILE = "introducer.port"
     NODETYPE = "introducer"
     VDRIVEDIR = "vdrive"
+    ENCODING_PARAMETERS_FILE = "encoding_parameters"
+    DEFAULT_K, DEFAULT_DESIRED, DEFAULT_N = 3, 7, 10
 
     def __init__(self, basedir="."):
         node.Node.__init__(self, basedir)
         self.urls = {}
+        self.read_encoding_parameters()
 
     def tub_ready(self):
-        r = self.add_service(Introducer())
+        i = Introducer()
+        r = self.add_service(i)
         self.urls["introducer"] = self.tub.registerReference(r, "introducer")
         self.log(" introducer is at %s" % self.urls["introducer"])
         f = open(os.path.join(self.basedir, "introducer.furl"), "w")
@@ -32,3 +36,17 @@ class IntroducerAndVdrive(node.Node):
         f.write(self.urls["vdrive"] + "\n")
         f.close()
 
+        encoding_parameters = self.read_encoding_parameters()
+        i.set_encoding_parameters(encoding_parameters)
+
+    def read_encoding_parameters(self):
+        k, desired, n = self.DEFAULT_K, self.DEFAULT_DESIRED, self.DEFAULT_N
+        PARAM_FILE = os.path.join(self.basedir, self.ENCODING_PARAMETERS_FILE)
+        if os.path.exists(PARAM_FILE):
+            f = open(PARAM_FILE, "r")
+            data = f.read().strip()
+            f.close()
+            k,desired,n = data.split()
+            k = int(k); desired = int(desired); n = int(n)
+        return k, desired, n
+
index c85d20aee4310917d772022b98a0ff3517fd65db..75eea86ded9c6c67e0200138e2eff2e2a4987e0c 100644 (file)
@@ -1,4 +1,5 @@
 
+import os
 from twisted.trial import unittest
 from foolscap.eventual import fireEventually, flushEventualQueue
 
@@ -7,9 +8,34 @@ from allmydata.util import testutil
 
 class Basic(testutil.SignalMixin, unittest.TestCase):
     def test_loadable(self):
-        q = introducer_and_vdrive.IntroducerAndVdrive()
+        basedir = "introducer_and_vdrive.Basic.test_loadable"
+        os.mkdir(basedir)
+        q = introducer_and_vdrive.IntroducerAndVdrive(basedir)
         d = fireEventually(None)
         d.addCallback(lambda res: q.startService())
+        d.addCallback(lambda res: q.when_tub_ready())
+        def _check_parameters(res):
+            i = q.getServiceNamed("introducer")
+            self.failUnlessEqual(i._encoding_parameters, (3, 7, 10))
+        d.addCallback(_check_parameters)
+        d.addCallback(lambda res: q.stopService())
+        d.addCallback(flushEventualQueue)
+        return d
+
+    def test_set_parameters(self):
+        basedir = "introducer_and_vdrive.Basic.test_set_parameters"
+        os.mkdir(basedir)
+        f = open(os.path.join(basedir, "encoding_parameters"), "w")
+        f.write("25 75 100")
+        f.close()
+        q = introducer_and_vdrive.IntroducerAndVdrive(basedir)
+        d = fireEventually(None)
+        d.addCallback(lambda res: q.startService())
+        d.addCallback(lambda res: q.when_tub_ready())
+        def _check_parameters(res):
+            i = q.getServiceNamed("introducer")
+            self.failUnlessEqual(i._encoding_parameters, (25, 75, 100))
+        d.addCallback(_check_parameters)
         d.addCallback(lambda res: q.stopService())
         d.addCallback(flushEventualQueue)
         return d
index 066746cce2134698ecc1e96a96349530c6613f10..45bb9e6abd48e6abab213cdddb58a701058fc29b 100644 (file)
@@ -573,8 +573,8 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
         self.failUnless("size: %d\n" % len(self.data) in output)
         self.failUnless("num_segments: 1\n" in output)
         # segment_size is always a multiple of needed_shares
-        self.failUnless("segment_size: 125\n" in output)
-        self.failUnless("total_shares: 100\n" in output)
+        self.failUnless("segment_size: 114\n" in output)
+        self.failUnless("total_shares: 10\n" in output)
         # keys which are supposed to be present
         for key in ("size", "num_segments", "segment_size",
                     "needed_shares", "total_shares",
index 43548831bfbc5ea9663876fa6fb6d9ceb2b1fc41..6b508513a1f979b0108e5a2362ef4eab9599d231 100644 (file)
@@ -14,6 +14,8 @@ class FakeClient:
     def get_permuted_peers(self, storage_index):
         return [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),)
                  for fakeid in range(50) ]
+    def get_encoding_parameters(self):
+        return None
 
 DATA = """
 Once upon a time, there was a beautiful princess named Buttercup. She lived
index 7b5a6b060eb0f35deba365d754cb3678d011153c..33afc0a23070ffe7d3db3e2cbfced5b285c96bb0 100644 (file)
@@ -65,7 +65,10 @@ class FileUploader:
         self._client = client
         self._options = options
 
-    def set_params(self, needed_shares, shares_of_happiness, total_shares):
+    def set_params(self, encoding_parameters):
+        self._encoding_parameters = encoding_parameters
+
+        needed_shares, shares_of_happiness, total_shares = encoding_parameters
         self.needed_shares = needed_shares
         self.shares_of_happiness = shares_of_happiness
         self.total_shares = total_shares
@@ -111,6 +114,7 @@ class FileUploader:
 
     def setup_encoder(self):
         self._encoder = encode.Encoder(self._options)
+        self._encoder.set_params(self._encoding_parameters)
         self._encoder.setup(self._filehandle, self._encryption_key)
         share_size = self._encoder.get_share_size()
         block_size = self._encoder.get_block_size()
@@ -313,9 +317,12 @@ class Uploader(service.MultiService):
     uploader_class = FileUploader
     URI_LIT_SIZE_THRESHOLD = 55
 
-    needed_shares = 25 # Number of shares required to reconstruct a file.
-    desired_shares = 75 # We will abort an upload unless we can allocate space for at least this many.
-    total_shares = 100 # Total number of shares created by encoding.  If everybody has room then this is is how many we will upload.
+    DEFAULT_ENCODING_PARAMETERS = (25, 75, 100)
+    # this is a tuple of (needed, desired, total). 'needed' is the number of
+    # shares required to reconstruct a file. 'desired' means that we will
+    # abort an upload unless we can allocate space for at least this many.
+    # 'total' is the total number of shares created by encoding. If everybody
+    # has room then this is is how many we will upload.
 
     def compute_id_strings(self, f):
         # return a list of (plaintext_hash, encryptionkey, crypttext_hash)
@@ -366,8 +373,10 @@ class Uploader(service.MultiService):
         else:
             u = self.uploader_class(self.parent, options)
             u.set_filehandle(fh)
-            u.set_params(self.needed_shares, self.desired_shares,
-                         self.total_shares)
+            encoding_parameters = self.parent.get_encoding_parameters()
+            if not encoding_parameters:
+                encoding_parameters = self.DEFAULT_ENCODING_PARAMETERS
+            u.set_params(encoding_parameters)
             plaintext_hash, key, crypttext_hash = self.compute_id_strings(fh)
             u.set_encryption_key(key)
             u.set_id_strings(crypttext_hash, plaintext_hash)