move validation data to thingA, URI has storage_index plus thingA hash
authorBrian Warner <warner@allmydata.com>
Sat, 2 Jun 2007 01:48:01 +0000 (18:48 -0700)
committerBrian Warner <warner@allmydata.com>
Sat, 2 Jun 2007 01:48:01 +0000 (18:48 -0700)
This (compatibility-breaking) change moves much of the validation data and
encoding parameters out of the URI and into the so-called "thingA" block
(which will get a better name as soon as we find one we're comfortable with).
The URI retains the "storage_index" (a generalized term for the role that
we're currently using the verifierid for, the unique index for each file
that gets used by storage servers to decide which shares to return), the
decryption key, the needed_shares/total_shares counts (since they affect
peer selection), and the hash of the thingA block.

This shortens the URI and lets us add more kinds of validation data without
growing the URI (like plaintext merkle trees, to enable strong incremental
plaintext validation), at the cost of maybe 150 bytes of alacrity. Each
storage server holds an identical copy of the thingA block.

This is an incompatible change: new messages have been added to the storage
server interface, and the URI format has changed drastically.

src/allmydata/download.py
src/allmydata/encode.py
src/allmydata/hashtree.py
src/allmydata/interfaces.py
src/allmydata/storageserver.py
src/allmydata/test/test_encode.py
src/allmydata/test/test_system.py
src/allmydata/test/test_upload.py
src/allmydata/upload.py
src/allmydata/uri.py

index 134fe2656d2bc4ef0df339e7168b16f4a21e69ec..ab7ad40edbe2259b5ffac83ed01416319148add1 100644 (file)
@@ -5,7 +5,7 @@ from twisted.python import log
 from twisted.internet import defer
 from twisted.application import service
 
-from allmydata.util import idlib, mathutil
+from allmydata.util import idlib, mathutil, bencode
 from allmydata.util.assertutil import _assert
 from allmydata import codec, hashtree
 from allmydata.Crypto.Cipher import AES
@@ -19,6 +19,8 @@ class HaveAllPeersError(Exception):
     # we use this to jump out of the loop
     pass
 
+class BadThingAHashValue(Exception):
+    pass
 
 class Output:
     def __init__(self, downloadable, key):
@@ -223,55 +225,44 @@ class FileDownloader:
 
     def __init__(self, client, uri, downloadable):
         self._client = client
-        self._downloadable = downloadable
 
         d = unpack_uri(uri)
-        verifierid = d['verifierid']
-        size = d['size']
-        segment_size = d['segment_size']
-        assert isinstance(verifierid, str)
-        assert len(verifierid) == 20
-        self._verifierid = verifierid
-        self._fileid = d['fileid']
-        self._roothash = d['roothash']
-
-        self._codec = codec.get_decoder_by_name(d['codec_name'])
-        self._codec.set_serialized_params(d['codec_params'])
-        self._tail_codec = codec.get_decoder_by_name(d['codec_name'])
-        self._tail_codec.set_serialized_params(d['tail_codec_params'])
-
-
-        self._total_segments = mathutil.div_ceil(size, segment_size)
-        self._current_segnum = 0
-        self._segment_size = segment_size
-        self._size = size
-        self._num_needed_shares = self._codec.get_needed_shares()
+        self._storage_index = d['storage_index']
+        self._thingA_hash = d['thingA_hash']
+        self._total_shares = d['total_shares']
+        self._size = d['size']
+        self._num_needed_shares = d['needed_shares']
 
         self._output = Output(downloadable, d['key'])
 
-        self._share_hashtree = hashtree.IncompleteHashTree(d['total_shares'])
-        self._share_hashtree.set_hashes({0: self._roothash})
-
         self.active_buckets = {} # k: shnum, v: bucket
-        self._share_buckets = {} # k: shnum, v: set of buckets
+        self._share_buckets = [] # list of (sharenum, bucket) tuples
+        self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
+        self._thingA_sources = []
+
+        self._thingA_data = None
 
     def start(self):
-        log.msg("starting download [%s]" % (idlib.b2a(self._verifierid),))
+        log.msg("starting download [%s]" % idlib.b2a(self._storage_index))
 
         # first step: who should we download from?
         d = defer.maybeDeferred(self._get_all_shareholders)
         d.addCallback(self._got_all_shareholders)
-        # once we know that, we can download blocks from them
+        # now get the thingA block from somebody and validate it
+        d.addCallback(self._obtain_thingA)
+        d.addCallback(self._got_thingA)
+        d.addCallback(self._create_validated_buckets)
+        # once we know that, we can download blocks from everybody
         d.addCallback(self._download_all_segments)
         d.addCallback(self._done)
         return d
 
     def _get_all_shareholders(self):
         dl = []
-        for (permutedpeerid, peerid, connection) in self._client.get_permuted_peers(self._verifierid):
+        for (permutedpeerid, peerid, connection) in self._client.get_permuted_peers(self._storage_index):
             d = connection.callRemote("get_service", "storageserver")
             d.addCallback(lambda ss: ss.callRemote("get_buckets",
-                                                   self._verifierid))
+                                                   self._storage_index))
             d.addCallbacks(self._got_response, self._got_error,
                            callbackArgs=(connection,))
             dl.append(d)
@@ -281,13 +272,11 @@ class FileDownloader:
         _assert(isinstance(buckets, dict), buckets) # soon foolscap will check this for us with its DictOf schema constraint
         for sharenum, bucket in buckets.iteritems():
             self.add_share_bucket(sharenum, bucket)
+            self._thingA_sources.append(bucket)
 
     def add_share_bucket(self, sharenum, bucket):
-        vbucket = ValidatedBucket(sharenum, bucket,
-                                  self._share_hashtree,
-                                  self._roothash,
-                                  self._total_segments)
-        self._share_buckets.setdefault(sharenum, set()).add(vbucket)
+        # this is split out for the benefit of test_encode.py
+        self._share_buckets.append( (sharenum, bucket) )
 
     def _got_error(self, f):
         self._client.log("Somebody failed. -- %s" % (f,))
@@ -295,23 +284,78 @@ class FileDownloader:
     def bucket_failed(self, vbucket):
         shnum = vbucket.sharenum
         del self.active_buckets[shnum]
-        s = self._share_buckets[shnum]
+        s = self._share_vbuckets[shnum]
         # s is a set of ValidatedBucket instances
         s.remove(vbucket)
         # ... which might now be empty
         if not s:
             # there are no more buckets which can provide this share, so
             # remove the key. This may prompt us to use a different share.
-            del self._share_buckets[shnum]
+            del self._share_vbuckets[shnum]
 
     def _got_all_shareholders(self, res):
         if len(self._share_buckets) < self._num_needed_shares:
             raise NotEnoughPeersError
-        for s in self._share_buckets.values():
-            for vb in s:
-                assert isinstance(vb, ValidatedBucket), \
-                       "vb is %s but should be a ValidatedBucket" % (vb,)
+        #for s in self._share_vbuckets.values():
+        #    for vb in s:
+        #        assert isinstance(vb, ValidatedBucket), \
+        #               "vb is %s but should be a ValidatedBucket" % (vb,)
+
+    def _obtain_thingA(self, ignored=None):
+        # all shareholders are supposed to have a copy of thingA, and all are
+        # supposed to be identical. We compute the hash of the data that
+        # comes back, and compare it against the version in our URI. If they
+        # don't match, ignore their data and try someone else.
+        if not self._thingA_sources:
+            raise NotEnoughPeersError("ran out of peers while fetching thingA")
+        bucket = self._thingA_sources.pop()
+        d = bucket.callRemote("get_thingA")
+        def _got(thingA):
+            h = hashtree.thingA_hash(thingA)
+            if h != self._thingA_hash:
+                msg = ("The copy of thingA we received from %s was bad" %
+                       bucket)
+                raise BadThingAHashValue(msg)
+            return bencode.bdecode(thingA)
+        d.addCallback(_got)
+        def _bad(f):
+            log.msg("thingA from vbucket %s failed: %s" % (bucket, f)) # WEIRD
+            # try again with a different one
+            return self._obtain_thingA()
+        d.addErrback(_bad)
+        return d
+
+    def _got_thingA(self, thingA_data):
+        d = self._thingA_data = thingA_data
+
+        self._codec = codec.get_decoder_by_name(d['codec_name'])
+        self._codec.set_serialized_params(d['codec_params'])
+        self._tail_codec = codec.get_decoder_by_name(d['codec_name'])
+        self._tail_codec.set_serialized_params(d['tail_codec_params'])
+
+        verifierid = d['verifierid']
+        assert isinstance(verifierid, str)
+        assert len(verifierid) == 20
+        self._verifierid = verifierid
+        self._fileid = d['fileid']
+        self._roothash = d['share_root_hash']
+
+        self._segment_size = segment_size = d['segment_size']
+        self._total_segments = mathutil.div_ceil(self._size, segment_size)
+        self._current_segnum = 0
+
+        self._share_hashtree = hashtree.IncompleteHashTree(d['total_shares'])
+        self._share_hashtree.set_hashes({0: self._roothash})
 
+    def _create_validated_buckets(self, ignored=None):
+        self._share_vbuckets = {}
+        for sharenum, bucket in self._share_buckets:
+            vbucket = ValidatedBucket(sharenum, bucket,
+                                      self._share_hashtree,
+                                      self._roothash,
+                                      self._total_segments)
+            s = self._share_vbuckets.setdefault(sharenum, set())
+            s.add(vbucket)
 
     def _activate_enough_buckets(self):
         """either return a mapping from shnum to a ValidatedBucket that can
@@ -320,23 +364,23 @@ class FileDownloader:
         while len(self.active_buckets) < self._num_needed_shares:
             # need some more
             handled_shnums = set(self.active_buckets.keys())
-            available_shnums = set(self._share_buckets.keys())
+            available_shnums = set(self._share_vbuckets.keys())
             potential_shnums = list(available_shnums - handled_shnums)
             if not potential_shnums:
                 raise NotEnoughPeersError
             # choose a random share
             shnum = random.choice(potential_shnums)
             # and a random bucket that will provide it
-            validated_bucket = random.choice(list(self._share_buckets[shnum]))
+            validated_bucket = random.choice(list(self._share_vbuckets[shnum]))
             self.active_buckets[shnum] = validated_bucket
         return self.active_buckets
 
 
     def _download_all_segments(self, res):
-        # the promise: upon entry to this function, self._share_buckets
+        # the promise: upon entry to this function, self._share_vbuckets
         # contains enough buckets to complete the download, and some extra
         # ones to tolerate some buckets dropping out or having errors.
-        # self._share_buckets is a dictionary that maps from shnum to a set
+        # self._share_vbuckets is a dictionary that maps from shnum to a set
         # of ValidatedBuckets, which themselves are wrappers around
         # RIBucketReader references.
         self.active_buckets = {} # k: shnum, v: ValidatedBucket instance
index 35a7e18b978252e34dd1a43abbe8afff85629d4d..5581b5fc12c7d30b8e561cdf481f9399f8caa2c8 100644 (file)
@@ -3,9 +3,9 @@
 from zope.interface import implements
 from twisted.internet import defer
 from twisted.python import log
-from allmydata.hashtree import HashTree, block_hash
+from allmydata.hashtree import HashTree, block_hash, thingA_hash
 from allmydata.Crypto.Cipher import AES
-from allmydata.util import mathutil
+from allmydata.util import mathutil, bencode
 from allmydata.util.assertutil import _assert
 from allmydata.codec import CRSEncoder
 from allmydata.interfaces import IEncoder
@@ -78,6 +78,7 @@ class Encoder(object):
                           (self.NEEDED_SHARES, self.TOTAL_SHARES))
         self.NEEDED_SHARES = k
         self.TOTAL_SHARES = n
+        self.thingA_data = {}
 
     def setup(self, infile, encryption_key):
         self.infile = infile
@@ -103,6 +104,15 @@ class Encoder(object):
         self._codec.set_params(self.segment_size,
                                self.required_shares, self.num_shares)
 
+        data = self.thingA_data
+        data['codec_name'] = self._codec.get_encoder_type()
+        data['codec_params'] = self._codec.get_serialized_params()
+
+        data['size'] = self.file_size
+        data['segment_size'] = self.segment_size
+        data['needed_shares'] = self.required_shares
+        data['total_shares'] = self.num_shares
+
         # the "tail" is the last segment. This segment may or may not be
         # shorter than all other segments. We use the "tail codec" to handle
         # it. If the tail is short, we use a different codec instance. In
@@ -118,6 +128,10 @@ class Encoder(object):
         self._tail_codec = CRSEncoder()
         self._tail_codec.set_params(padded_tail_size,
                                     self.required_shares, self.num_shares)
+        data['tail_codec_params'] = self._tail_codec.get_serialized_params()
+
+    def set_thingA_data(self, thingA_data):
+        self.thingA_data.update(thingA_data)
 
     def get_share_size(self):
         share_size = mathutil.div_ceil(self.file_size, self.required_shares)
@@ -156,6 +170,7 @@ class Encoder(object):
 
         d.addCallback(lambda res: self.send_all_subshare_hash_trees())
         d.addCallback(lambda res: self.send_all_share_hash_trees())
+        d.addCallback(lambda res: self.send_thingA_to_all_shareholders())
         d.addCallback(lambda res: self.close_all_shareholders())
         d.addCallbacks(lambda res: self.done(), self.err)
         return d
@@ -277,7 +292,7 @@ class Encoder(object):
         # create the share hash tree
         t = HashTree(self.share_root_hashes)
         # the root of this hash tree goes into our URI
-        self.root_hash = t[0]
+        self.thingA_data['share_root_hash'] = t[0]
         # now send just the necessary pieces out to each shareholder
         for i in range(self.num_shares):
             # the HashTree is given a list of leaves: 0,1,2,3..n .
@@ -293,6 +308,18 @@ class Encoder(object):
         sh = self.landlords[shareid]
         return sh.callRemote("put_share_hashes", needed_hashes)
 
+    def send_thingA_to_all_shareholders(self):
+        log.msg("%s: sending thingA" % self)
+        thingA = bencode.bencode(self.thingA_data)
+        self.thingA_hash = thingA_hash(thingA)
+        dl = []
+        for sh in self.landlords.values():
+            dl.append(self.send_thingA(sh, thingA))
+        return defer.DeferredList(dl)
+
+    def send_thingA(self, sh, thingA):
+        return sh.callRemote("put_thingA", thingA)
+
     def close_all_shareholders(self):
         log.msg("%s: closing shareholders" % self)
         dl = []
@@ -302,7 +329,7 @@ class Encoder(object):
 
     def done(self):
         log.msg("%s: upload done" % self)
-        return self.root_hash
+        return self.thingA_hash
 
     def err(self, f):
         log.msg("%s: upload failed: %s" % (self, f))
index c03c90d023e6f8203c5f31134eacfa625c2e005d..2640cfc1766cb4a4bf6c4e482344580dd3d29e53 100644 (file)
@@ -439,3 +439,6 @@ class IncompleteHashTree(CompleteBinaryTreeMixin, list):
 
 def block_hash(data):
     return tagged_hash("encoded subshare", data)
+
+def thingA_hash(data):
+    return tagged_hash("thingA", data)
index 73b53673600803121792dc97ac7f5131d375b363..f419145b8702116f77f2239bb1e8eedc6358e856 100644 (file)
@@ -12,9 +12,11 @@ Nodeid = StringConstraint(maxLength=20,
                           minLength=20) # binary format 20-byte SHA1 hash
 FURL = StringConstraint(1000)
 Verifierid = StringConstraint(20)
+StorageIndex = StringConstraint(32)
 URI = StringConstraint(300) # kind of arbitrary
 MAX_BUCKETS = 200  # per peer
-ShareData = StringConstraint(100000)
+ShareData = StringConstraint(100000) # 2MB segment / k=25
+ThingAData = StringConstraint(1000)
 
 class RIIntroducerClient(RemoteInterface):
     def new_peers(furls=SetOf(FURL)):
@@ -56,6 +58,16 @@ class RIBucketWriter(RemoteInterface):
     def put_share_hashes(sharehashes=ListOf(TupleOf(int, Hash), maxLength=2**20)):
         return None
 
+    def put_thingA(data=ThingAData):
+        """This as-yet-unnamed block of data contains integrity-checking
+        information (hashes of plaintext, crypttext, and shares), as well as
+        encoding parameters that are necessary to recover the data. This is a
+        bencoded dict mapping strings to other strings. The hash of this data
+        is kept in the URI and verified before any of the data is used. All
+        buckets for a given file contain identical copies of this data.
+        """
+        return None
+
     def close():
         """
         If the data that has been written is incomplete or inconsistent then
@@ -74,9 +86,12 @@ class RIBucketReader(RemoteInterface):
         return ListOf(Hash, maxLength=2**20)
     def get_share_hashes():
         return ListOf(TupleOf(int, Hash), maxLength=2**20)
+    def get_thingA():
+        return ThingAData
+
 
 class RIStorageServer(RemoteInterface):
-    def allocate_buckets(verifierid=Verifierid,
+    def allocate_buckets(storage_index=StorageIndex,
                          sharenums=SetOf(int, maxLength=MAX_BUCKETS),
                          sharesize=int, blocksize=int, canary=Referenceable):
         """
@@ -86,7 +101,7 @@ class RIStorageServer(RemoteInterface):
         """
         return TupleOf(SetOf(int, maxLength=MAX_BUCKETS),
                        DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS))
-    def get_buckets(verifierid=Verifierid):
+    def get_buckets(storage_index=StorageIndex):
         return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
 
 # hm, we need a solution for forward references in schemas
@@ -377,7 +392,7 @@ class IEncoder(Interface):
         input file, encrypting it, encoding the pieces, uploading the shares
         to the shareholders, then sending the hash trees.
 
-        I return a Deferred that fires with the root hash.
+        I return a Deferred that fires with the hash of the thingA data block.
         """
 
 class IDecoder(Interface):
index 1b0dd032c3d86eb30b132fab4a014cc667e35a7b..310a965e53532d6977120dd86283598c5a935722 100644 (file)
@@ -11,13 +11,13 @@ from allmydata.util import bencode, fileutil, idlib
 from allmydata.util.assertutil import precondition
 
 # store/
-# store/incoming # temp dirs named $VERIFIERID/$SHARENUM which will be moved to store/$VERIFIERID/$SHARENUM on success
-# store/$VERIFIERID
-# store/$VERIFIERID/$SHARENUM
-# store/$VERIFIERID/$SHARENUM/blocksize
-# store/$VERIFIERID/$SHARENUM/data
-# store/$VERIFIERID/$SHARENUM/blockhashes
-# store/$VERIFIERID/$SHARENUM/sharehashtree
+# store/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/$STORAGEINDEX/$SHARENUM on success
+# store/$STORAGEINDEX
+# store/$STORAGEINDEX/$SHARENUM
+# store/$STORAGEINDEX/$SHARENUM/blocksize
+# store/$STORAGEINDEX/$SHARENUM/data
+# store/$STORAGEINDEX/$SHARENUM/blockhashes
+# store/$STORAGEINDEX/$SHARENUM/sharehashtree
 
 # $SHARENUM matches this regex:
 NUM_RE=re.compile("[0-9]*")
@@ -59,6 +59,10 @@ class BucketWriter(Referenceable):
         precondition(not self.closed)
         self._write_file('sharehashes', bencode.bencode(sharehashes))
 
+    def remote_put_thingA(self, data):
+        precondition(not self.closed)
+        self._write_file('thingA', data)
+
     def remote_close(self):
         precondition(not self.closed)
         # TODO assert or check the completeness and consistency of the data that has been written
@@ -100,6 +104,9 @@ class BucketReader(Referenceable):
         # schema
         return [tuple(i) for i in hashes]
 
+    def remote_get_thingA(self):
+        return self._read_file('thingA')
+
 class StorageServer(service.MultiService, Referenceable):
     implements(RIStorageServer)
     name = 'storageserver'
@@ -116,13 +123,13 @@ class StorageServer(service.MultiService, Referenceable):
     def _clean_incomplete(self):
         fileutil.rm_dir(self.incomingdir)
 
-    def remote_allocate_buckets(self, verifierid, sharenums, sharesize,
+    def remote_allocate_buckets(self, storage_index, sharenums, sharesize,
                                 blocksize, canary):
         alreadygot = set()
         bucketwriters = {} # k: shnum, v: BucketWriter
         for shnum in sharenums:
-            incominghome = os.path.join(self.incomingdir, idlib.b2a(verifierid), "%d"%shnum)
-            finalhome = os.path.join(self.storedir, idlib.b2a(verifierid), "%d"%shnum)
+            incominghome = os.path.join(self.incomingdir, idlib.b2a(storage_index), "%d"%shnum)
+            finalhome = os.path.join(self.storedir, idlib.b2a(storage_index), "%d"%shnum)
             if os.path.exists(incominghome) or os.path.exists(finalhome):
                 alreadygot.add(shnum)
             else:
@@ -130,13 +137,13 @@ class StorageServer(service.MultiService, Referenceable):
             
         return alreadygot, bucketwriters
 
-    def remote_get_buckets(self, verifierid):
+    def remote_get_buckets(self, storage_index):
         bucketreaders = {} # k: sharenum, v: BucketReader
-        verifierdir = os.path.join(self.storedir, idlib.b2a(verifierid))
+        storagedir = os.path.join(self.storedir, idlib.b2a(storage_index))
         try:
-            for f in os.listdir(verifierdir):
+            for f in os.listdir(storagedir):
                 if NUM_RE.match(f):
-                    bucketreaders[int(f)] = BucketReader(os.path.join(verifierdir, f))
+                    bucketreaders[int(f)] = BucketReader(os.path.join(storagedir, f))
         except OSError:
             # Commonly caused by there being no buckets at all.
             pass
index 58022de6440e04fc6633505ef7221fe1630b76d6..7c69d135ca7531e0b7354be32fac481cf4296cff 100644 (file)
@@ -5,6 +5,7 @@ from twisted.internet import defer
 from twisted.python.failure import Failure
 from foolscap import eventual
 from allmydata import encode, download
+from allmydata.util import bencode
 from allmydata.uri import pack_uri
 from cStringIO import StringIO
 
@@ -70,6 +71,10 @@ class FakeBucketWriter:
         assert self.share_hashes is None
         self.share_hashes = sharehashes
 
+    def put_thingA(self, thingA):
+        assert not self.closed
+        self.thingA = thingA
+
     def close(self):
         assert not self.closed
         self.closed = True
@@ -78,7 +83,7 @@ class FakeBucketWriter:
         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
 
     def get_block(self, blocknum):
-        assert isinstance(blocknum, int)
+        assert isinstance(blocknum, (int, long))
         if self.mode == "bad block":
             return self.flip_bit(self.blocks[blocknum])
         return self.blocks[blocknum]
@@ -238,28 +243,41 @@ class Roundtrip(unittest.TestCase):
             shareholders[shnum] = peer
             all_shareholders.append(peer)
         e.set_shareholders(shareholders)
+        e.set_thingA_data({'verifierid': "V" * 20,
+                           'fileid': "F" * 20,
+                           })
         d = e.start()
-        def _uploaded(roothash):
-            URI = pack_uri(codec_name=e._codec.get_encoder_type(),
-                           codec_params=e._codec.get_serialized_params(),
-                           tail_codec_params=e._tail_codec.get_serialized_params(),
-                           verifierid="V" * 20,
-                           fileid="F" * 20,
+        def _uploaded(thingA_hash):
+            URI = pack_uri(storage_index="S" * 20,
                            key=nonkey,
-                           roothash=roothash,
+                           thingA_hash=thingA_hash,
                            needed_shares=e.required_shares,
                            total_shares=e.num_shares,
-                           size=e.file_size,
-                           segment_size=e.segment_size)
+                           size=e.file_size)
             client = None
             target = download.Data()
             fd = download.FileDownloader(client, URI, target)
             fd.check_verifierid = False
             fd.check_fileid = False
+            # grab a copy of thingA from one of the shareholders
+            thingA = shareholders[0].thingA
+            thingA_data = bencode.bdecode(thingA)
+            NOTthingA = {'codec_name': e._codec.get_encoder_type(),
+                      'codec_params': e._codec.get_serialized_params(),
+                      'tail_codec_params': e._tail_codec.get_serialized_params(),
+                      'verifierid': "V" * 20,
+                      'fileid': "F" * 20,
+                         #'share_root_hash': roothash,
+                      'segment_size': e.segment_size,
+                      'needed_shares': e.required_shares,
+                      'total_shares': e.num_shares,
+                      }
+            fd._got_thingA(thingA_data)
             for shnum in range(AVAILABLE_SHARES):
                 bucket = all_shareholders[shnum]
                 fd.add_share_bucket(shnum, bucket)
             fd._got_all_shareholders(None)
+            fd._create_validated_buckets(None)
             d2 = fd._download_all_segments(None)
             d2.addCallback(fd._done)
             return d2
index 7106aa42d26a7cfbda35bf3dba9c813338dddf8e..868b3ad1bc7eeac71f3fc899716284ecaad92a6b 100644 (file)
@@ -213,17 +213,20 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
 
     def mangle_uri(self, gooduri):
-        # change the verifierid, which means we'll be asking about the wrong
-        # file, so nobody will have any shares
+        # change the storage index, which means we'll be asking about the
+        # wrong file, so nobody will have any shares
         d = uri.unpack_uri(gooduri)
-        assert len(d['verifierid']) == 20
-        d['verifierid'] = self.flip_bit(d['verifierid'])
+        assert len(d['storage_index']) == 20
+        d['storage_index'] = self.flip_bit(d['storage_index'])
         return uri.pack_uri(**d)
 
-    # TODO: add a test which mangles the fileid instead, and should fail in
-    # the post-download phase when the file's integrity check fails. Do the
-    # same thing for the key, which should cause the download to fail the
-    # post-download verifierid check.
+    # TODO: add a test which mangles the thingA_hash instead, and should fail
+    # due to not being able to get a valid thingA block. Also a test which
+    # sneakily mangles the thingA block to change some of the validation
+    # data, so it will fail in the post-download phase when the file's
+    # crypttext integrity check fails. Do the same thing for the key, which
+    # should cause the download to fail the post-download plaintext
+    # verifierid check.
 
     def test_vdrive(self):
         self.basedir = "test_system/SystemTest/test_vdrive"
index efae86d48763fa431d7ff34708a732b9ebee6065..37386cebbbf83593e7236c47755ef30a37ba0ae3 100644 (file)
@@ -25,13 +25,10 @@ class GoodServer(unittest.TestCase):
         self.failUnless(isinstance(uri, str))
         self.failUnless(uri.startswith("URI:"))
         d = unpack_uri(uri)
-        self.failUnless(isinstance(d['verifierid'], str))
-        self.failUnlessEqual(len(d['verifierid']), 20)
-        self.failUnless(isinstance(d['fileid'], str))
-        self.failUnlessEqual(len(d['fileid']), 20)
+        self.failUnless(isinstance(d['storage_index'], str))
+        self.failUnlessEqual(len(d['storage_index']), 20)
         self.failUnless(isinstance(d['key'], str))
         self.failUnlessEqual(len(d['key']), 16)
-        self.failUnless(isinstance(d['codec_params'], str))
 
     def testData(self):
         data = "This is some data to upload"
index dc11d77f04de3ff643175276150f9089b938bc51..51d40896050f69190b3595eaf34258e7db27864c 100644 (file)
@@ -237,23 +237,21 @@ class FileUploader:
             buckets.update(peer.buckets)
         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
         self._encoder.set_shareholders(buckets)
+
+        thingA_data = {}
+        thingA_data['verifierid'] = self._verifierid
+        thingA_data['fileid'] = self._fileid
+        self._encoder.set_thingA_data(thingA_data)
         return self._encoder.start()
 
-    def _compute_uri(self, roothash):
-        codec_type = self._encoder._codec.get_encoder_type()
-        codec_params = self._encoder._codec.get_serialized_params()
-        tail_codec_params = self._encoder._tail_codec.get_serialized_params()
-        return pack_uri(codec_name=codec_type,
-                        codec_params=codec_params,
-                        tail_codec_params=tail_codec_params,
-                        verifierid=self._verifierid,
-                        fileid=self._fileid,
+    def _compute_uri(self, thingA_hash):
+        return pack_uri(storage_index=self._verifierid,
                         key=self._encryption_key,
-                        roothash=roothash,
+                        thingA_hash=thingA_hash,
                         needed_shares=self.needed_shares,
                         total_shares=self.total_shares,
                         size=self._size,
-                        segment_size=self._encoder.segment_size)
+                        )
 
 
 def netstring(s):
index 96fca1da4281d25fea7c9a3199dfaf4910fa9981..9cfce767d7df45b10ab08488b76fbc6176b7f63e 100644 (file)
@@ -5,39 +5,39 @@ from allmydata.util import idlib
 # enough information to retrieve and validate the contents. It shall be
 # expressed in a limited character set (namely [TODO]).
 
-def pack_uri(codec_name, codec_params, tail_codec_params,
-             verifierid, fileid, key,
-             roothash, needed_shares, total_shares, size, segment_size):
+def pack_uri(storage_index, key, thingA_hash,
+             needed_shares, total_shares, size):
     # applications should pass keyword parameters into this
-    assert isinstance(codec_name, str)
-    assert len(codec_name) < 10
-    assert ":" not in codec_name
-    assert isinstance(codec_params, str)
-    assert ":" not in codec_params
-    assert isinstance(tail_codec_params, str)
-    assert ":" not in tail_codec_params
-    assert isinstance(verifierid, str)
-    assert len(verifierid) == 20 # sha1 hash
-    assert isinstance(fileid, str)
-    assert len(fileid) == 20 # sha1 hash
+    assert isinstance(storage_index, str)
+    assert len(storage_index) == 20 # sha1 hash. TODO: sha256
+
+    assert isinstance(thingA_hash, str)
+    assert len(thingA_hash) == 32 # sha56 hash
+
     assert isinstance(key, str)
     assert len(key) == 16 # AES-128
-    return "URI:%s:%s:%s:%s:%s:%s:%s:%s:%s:%s:%s" % (codec_name, codec_params, tail_codec_params, idlib.b2a(verifierid), idlib.b2a(fileid), idlib.b2a(key), idlib.b2a(roothash), needed_shares, total_shares, size, segment_size)
+    assert isinstance(needed_shares, int)
+    assert isinstance(total_shares, int)
+    assert isinstance(size, (int,long))
+
+    return "URI:%s:%s:%s:%d:%d:%d" % (idlib.b2a(storage_index), idlib.b2a(key),
+                                      idlib.b2a(thingA_hash),
+                                      needed_shares, total_shares, size)
 
 
 def unpack_uri(uri):
     assert uri.startswith("URI:")
     d = {}
-    header, d['codec_name'], d['codec_params'], d['tail_codec_params'], verifierid_s, fileid_s, key_s, roothash_s, needed_shares_s, total_shares_s, size_s, segment_size_s = uri.split(":")
+    (header,
+     storage_index_s, key_s, thingA_hash_s,
+     needed_shares_s, total_shares_s, size_s) = uri.split(":")
     assert header == "URI"
-    d['verifierid'] = idlib.a2b(verifierid_s)
-    d['fileid'] = idlib.a2b(fileid_s)
+    d['storage_index'] = idlib.a2b(storage_index_s)
     d['key'] = idlib.a2b(key_s)
-    d['roothash'] = idlib.a2b(roothash_s)
+    d['thingA_hash'] = idlib.a2b(thingA_hash_s)
     d['needed_shares'] = int(needed_shares_s)
     d['total_shares'] = int(total_shares_s)
     d['size'] = int(size_s)
-    d['segment_size'] = int(segment_size_s)
     return d