]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
rearrange encode/upload, add URIs, switch to ReplicatingEncoder
authorBrian Warner <warner@allmydata.com>
Tue, 16 Jan 2007 04:22:22 +0000 (21:22 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 16 Jan 2007 04:22:22 +0000 (21:22 -0700)
Added metadata to the bucket store, which is used to hold the share number
(but the bucket doesn't know that, it just gets a string).

Modified the codec interfaces a bit.

Try to pass around URIs to/from download/upload instead of verifierids.
URI format is still in flux.

Change the current (primitive) file encoder to use a ReplicatingEncoder
because it provides ICodecEncoder. We will be moving to the (less primitive)
file encoder (currently in allmydata.encode_new) eventually, but for now
this change lets us test out PyRS or zooko's upcoming C-based RS codec in
something larger than a single unit test. This primitive file encoder only
uses a single segment, and has no merkle trees.

Also added allmydata.util.deferredutil for a DeferredList wrapper that
errbacks (but only when all component Deferreds have fired) if there were
any errors, which unfortunately is not a behavior available from the standard
DeferredList.

15 files changed:
src/allmydata/bucketstore.py
src/allmydata/client.py
src/allmydata/codec.py
src/allmydata/download.py
src/allmydata/encode_new.py
src/allmydata/filetable.py
src/allmydata/interfaces.py
src/allmydata/test/test_client.py
src/allmydata/test/test_codec.py
src/allmydata/test/test_system.py
src/allmydata/test/test_upload.py
src/allmydata/upload.py
src/allmydata/util/deferredutil.py [new file with mode: 0644]
src/allmydata/util/idlib.py
src/allmydata/vdrive.py

index 08ea9dc96661d72afc796249e6daeb0c02fa2e9c..923b8e813c537258db418a2c04e12951bd532832 100644 (file)
@@ -2,16 +2,13 @@ import os
 
 from foolscap import Referenceable
 from twisted.application import service
-from twisted.python.failure import Failure
+from twisted.python import log
 from allmydata.util import idlib
 from zope.interface import implements
 from allmydata.interfaces import RIBucketWriter, RIBucketReader
 
 from allmydata.util.assertutil import precondition, _assert
 
-class NoSuchBucketError(Failure):
-    pass
-
 class BucketStore(service.MultiService, Referenceable):
     def __init__(self, store_dir):
         precondition(os.path.isdir(store_dir))
@@ -94,6 +91,8 @@ class WriteBucket(Bucket):
     def __init__(self, bucket_dir, verifierid, bucket_num, size):
         Bucket.__init__(self, bucket_dir, verifierid)
         precondition(not os.path.exists(bucket_dir))
+        #log.msg("WriteBucket [%s]: creating bucket %s"
+        #        % (idlib.b2a(verifierid), bucket_dir))
         os.mkdir(bucket_dir)
 
         self._open = True
@@ -119,6 +118,8 @@ class WriteBucket(Bucket):
 
     def close(self):
         precondition(self._bytes_written == self._size)
+        #log.msg("WriteBucket.close [%s] (%s)"
+        #        % (idlib.b2a(self._verifierid), self._bucket_dir))
         self._data.close()
         self._write_attr('closed', '')
         self._open = False
index 360cdc93a4bf467c867e6bafcdea8aad58b5bf3f..203961ec51b32b04542c188f6b4d08056abf0040 100644 (file)
@@ -140,6 +140,7 @@ class Client(node.Node, Referenceable):
         # sort of at most max_count elements
         results = []
         for nodeid in self.all_peers:
+            assert isinstance(nodeid, str)
             permuted = sha.new(key + nodeid).digest()
             results.append((permuted, nodeid))
         results.sort()
index e8d99f1871935fa52672a564d0e267fc3ae674df..e330fe2e55d675b1ec8b363f379a75993f09b818 100644 (file)
@@ -14,10 +14,10 @@ class ReplicatingEncoder(object):
     implements(ICodecEncoder)
     ENCODER_TYPE = 0
 
-    def set_params(self, data_size, required_shares, total_shares):
+    def set_params(self, data_size, required_shares, max_shares):
         self.data_size = data_size
         self.required_shares = required_shares
-        self.total_shares = total_shares
+        self.max_shares = max_shares
 
     def get_encoder_type(self):
         return self.ENCODER_TYPE
@@ -28,8 +28,11 @@ class ReplicatingEncoder(object):
     def get_share_size(self):
         return self.data_size
 
-    def encode(self, data):
-        shares = [(i,data) for i in range(self.total_shares)]
+    def encode(self, data, num_shares=None):
+        if num_shares is None:
+            num_shares = self.max_shares
+        assert num_shares <= self.max_shares
+        shares = [(i,data) for i in range(num_shares)]
         return defer.succeed(shares)
 
 class ReplicatingDecoder(object):
@@ -38,6 +41,9 @@ class ReplicatingDecoder(object):
     def set_serialized_params(self, params):
         self.required_shares = int(params)
 
+    def get_required_shares(self):
+        return self.required_shares
+
     def decode(self, some_shares):
         assert len(some_shares) >= self.required_shares
         data = some_shares[0][1]
@@ -117,32 +123,38 @@ class PyRSEncoder(object):
     # than 20 minutes to run the test_encode_share tests, so I disabled most
     # of them. (uh, hello, it's running figleaf)
 
-    def set_params(self, data_size, required_shares, total_shares):
-        assert required_shares <= total_shares
+    def set_params(self, data_size, required_shares, max_shares):
+        assert required_shares <= max_shares
         self.data_size = data_size
         self.required_shares = required_shares
-        self.total_shares = total_shares
+        self.max_shares = max_shares
         self.chunk_size = required_shares
         self.num_chunks = mathutil.div_ceil(data_size, self.chunk_size)
         self.last_chunk_padding = mathutil.pad_size(data_size, required_shares)
         self.share_size = self.num_chunks
-        self.encoder = rs_code.RSCode(total_shares, required_shares, 8)
+        self.encoder = rs_code.RSCode(max_shares, required_shares, 8)
 
     def get_encoder_type(self):
         return self.ENCODER_TYPE
 
     def get_serialized_params(self):
         return "%d:%d:%d" % (self.data_size, self.required_shares,
-                             self.total_shares)
+                             self.max_shares)
 
     def get_share_size(self):
         return self.share_size
 
-    def encode(self, data):
-        share_data = [ [] for i in range(self.total_shares)]
+    def encode(self, data, num_shares=None):
+        if num_shares is None:
+            num_shares = self.max_shares
+        assert num_shares <= self.max_shares
+        # we create self.max_shares shares, then throw out any extra ones
+        # so that we always return exactly num_shares shares.
+
+        share_data = [ [] for i in range(self.max_shares)]
         for i in range(self.num_chunks):
             # we take self.chunk_size bytes from the input string, and
-            # turn it into self.total_shares bytes.
+            # turn it into self.max_shares bytes.
             offset = i*self.chunk_size
             # Note string slices aren't an efficient way to use memory, so
             # when we upgrade from the unusably slow py_ecc prototype to a
@@ -155,12 +167,12 @@ class PyRSEncoder(object):
             input_vector = [ord(x) for x in chunk]
             assert len(input_vector) == self.required_shares
             output_vector = self.encoder.Encode(input_vector)
-            assert len(output_vector) == self.total_shares
+            assert len(output_vector) == self.max_shares
             for i2,out in enumerate(output_vector):
                 share_data[i2].append(chr(out))
 
         shares = [ (i, "".join(share_data[i]))
-                   for i in range(self.total_shares) ]
+                   for i in range(num_shares) ]
         return defer.succeed(shares)
 
 class PyRSDecoder(object):
@@ -170,23 +182,26 @@ class PyRSDecoder(object):
         pieces = params.split(":")
         self.data_size = int(pieces[0])
         self.required_shares = int(pieces[1])
-        self.total_shares = int(pieces[2])
+        self.max_shares = int(pieces[2])
 
         self.chunk_size = self.required_shares
         self.num_chunks = mathutil.div_ceil(self.data_size, self.chunk_size)
         self.last_chunk_padding = mathutil.pad_size(self.data_size,
                                                     self.required_shares)
         self.share_size = self.num_chunks
-        self.encoder = rs_code.RSCode(self.total_shares, self.required_shares,
+        self.encoder = rs_code.RSCode(self.max_shares, self.required_shares,
                                       8)
         if False:
             print "chunk_size: %d" % self.chunk_size
             print "num_chunks: %d" % self.num_chunks
             print "last_chunk_padding: %d" % self.last_chunk_padding
             print "share_size: %d" % self.share_size
-            print "total_shares: %d" % self.total_shares
+            print "max_shares: %d" % self.max_shares
             print "required_shares: %d" % self.required_shares
 
+    def get_required_shares(self):
+        return self.required_shares
+
     def decode(self, some_shares):
         chunk_size = self.chunk_size
         assert len(some_shares) >= self.required_shares
@@ -198,7 +213,7 @@ class PyRSDecoder(object):
             # this takes one byte from each share, and turns the combination
             # into a single chunk
             received_vector = []
-            for j in range(self.total_shares):
+            for j in range(self.max_shares):
                 share = have_shares.get(j)
                 if share is not None:
                     received_vector.append(ord(share[i]))
index 8646f788f1b7c447df810b4ac35408ac99d81c44..2b5fca7655cf7314d1dd75043e1aaa7088cf1055 100644 (file)
@@ -1,11 +1,12 @@
 
-import os
+import os, sha
 from zope.interface import Interface, implements
 from twisted.python import failure, log
 from twisted.internet import defer
 from twisted.application import service
 
-from allmydata.util import idlib
+from allmydata.util import idlib, bencode
+from allmydata.util.deferredutil import DeferredListShouldSucceed
 from allmydata import codec
 
 class NotEnoughPeersError(Exception):
@@ -15,13 +16,21 @@ class HaveAllPeersError(Exception):
     # we use this to jump out of the loop
     pass
 
+def unpack_uri(uri):
+    assert uri.startswith("URI:")
+    return bencode.bdecode(uri[4:])
+
 class FileDownloader:
     debug = False
 
-    def __init__(self, peer, verifierid):
+    def __init__(self, peer, verifierid, encoding_params):
         self._peer = peer
         assert isinstance(verifierid, str)
+        assert len(verifierid) == 20
         self._verifierid = verifierid
+        self._decoder = codec.ReplicatingDecoder()
+        self._decoder.set_serialized_params(encoding_params)
+        self.needed_shares = self._decoder.get_required_shares()
 
     def set_download_target(self, target):
         self._target = target
@@ -30,15 +39,8 @@ class FileDownloader:
     def _cancel(self):
         pass
 
-    def make_decoder(self):
-        n = self._shares = 4
-        k = self._desired_shares = 2
-        self._target.open()
-        self._decoder = codec.Decoder(self._target, k, n,
-                                      self._verifierid)
-
     def start(self):
-        log.msg("starting download")
+        log.msg("starting download [%s]" % (idlib.b2a(self._verifierid),))
         if self.debug:
             print "starting download"
         # first step: who should we download from?
@@ -75,7 +77,7 @@ class FileDownloader:
                                                              bucket_nums)
 
                     self.landlords.append( (peerid, buckets) )
-                if len(self.landlords) >= self._desired_shares:
+                if len(self.landlords) >= self.needed_shares:
                     if self.debug: print " we're done!"
                     raise HaveAllPeersError
                 # otherwise we fall through to search more peers
@@ -107,7 +109,34 @@ class FileDownloader:
         all_buckets = []
         for peerid, buckets in self.landlords:
             all_buckets.extend(buckets)
-        d = self._decoder.start(all_buckets)
+        # TODO: try to avoid pulling multiple shares from the same peer
+        all_buckets = all_buckets[:self.needed_shares]
+        # retrieve all shares
+        dl = []
+        shares = []
+        for (bucket_num, bucket) in all_buckets:
+            d0 = bucket.callRemote("get_metadata")
+            d1 = bucket.callRemote("read")
+            d2 = DeferredListShouldSucceed([d0, d1])
+            def _got(res):
+                sharenum_s, sharedata = res
+                sharenum = bencode.bdecode(sharenum_s)
+                shares.append((sharenum, sharedata))
+            d2.addCallback(_got)
+            dl.append(d2)
+        d = DeferredListShouldSucceed(dl)
+
+        d.addCallback(lambda res: self._decoder.decode(shares))
+
+        def _write(data):
+            self._target.open()
+            hasher = sha.new(netstring("allmydata_v1_verifierid"))
+            hasher.update(data)
+            vid = hasher.digest()
+            assert self._verifierid == vid, "%s != %s" % (idlib.b2a(self._verifierid), idlib.b2a(vid))
+            self._target.write(data)
+        d.addCallback(_write)
+
         def _done(res):
             self._target.close()
             return self._target.finish()
@@ -204,26 +233,29 @@ class Downloader(service.MultiService):
     """
     implements(IDownloader)
     name = "downloader"
+    debug = False
 
-    def download(self, verifierid, t):
+    def download(self, uri, t):
+        (verifierid, params) = unpack_uri(uri)
         assert self.parent
         assert self.running
         assert isinstance(verifierid, str)
         t = IDownloadTarget(t)
         assert t.write
         assert t.close
-        dl = FileDownloader(self.parent, verifierid)
+        dl = FileDownloader(self.parent, verifierid, params)
         dl.set_download_target(t)
-        dl.make_decoder()
+        if self.debug:
+            dl.debug = True
         d = dl.start()
         return d
 
     # utility functions
-    def download_to_data(self, verifierid):
-        return self.download(verifierid, Data())
-    def download_to_filename(self, verifierid, filename):
-        return self.download(verifierid, FileName(filename))
-    def download_to_filehandle(self, verifierid, filehandle):
-        return self.download(verifierid, FileHandle(filehandle))
+    def download_to_data(self, uri):
+        return self.download(uri, Data())
+    def download_to_filename(self, uri, filename):
+        return self.download(uri, FileName(filename))
+    def download_to_filehandle(self, uri, filehandle):
+        return self.download(uri, FileHandle(filehandle))
 
 
index 0da54501c0d8c1cb522e05150039acb137bbc72c..df09f6d418f8a6ef86d8229084a35d4c2bca8439 100644 (file)
@@ -129,6 +129,7 @@ class Encoder(object):
         segment_plaintext = self.infile.read(self.segment_size)
         segment_crypttext = self.cryptor.encrypt(segment_plaintext)
         del segment_plaintext
+        assert self.encoder.max_shares == self.num_shares
         d = self.encoder.encode(segment_crypttext)
         d.addCallback(self._encoded_segment)
         return d
index bb96b02ad5804e5cff52131d26443cade3b1c4c9..7b8c0e19173f4b6c4c4492386b80202640cf466d 100644 (file)
@@ -74,10 +74,10 @@ class MutableDirectoryNode(Referenceable):
         return self.make_subnode(absname)
     remote_add_directory = add_directory
 
-    def add_file(self, name, data):
+    def add_file(self, name, uri):
         self.validate_name(name)
         f = open(os.path.join(self._basedir, name), "wb")
-        f.write(data)
+        f.write(uri)
         f.close()
     remote_add_file = add_file
 
index 396a75d24bec7b0474b7cff469a52f7f883039bf..246e3d7fb502ee7c879c2989263d206c346359ac 100644 (file)
@@ -6,6 +6,7 @@ from foolscap import RemoteInterface
 Nodeid = StringConstraint(20) # binary format 20-byte SHA1 hash
 PBURL = StringConstraint(150)
 Verifierid = StringConstraint(20)
+URI = StringConstraint(100) # kind of arbitrary
 ShareData = StringConstraint(100000)
 # these four are here because Foolscap does not yet support the kind of
 # restriction I really want to apply to these.
@@ -65,7 +66,7 @@ class RIMutableDirectoryNode(RemoteInterface):
     def add_directory(name=str):
         return RIMutableDirectoryNode_
 
-    def add_file(name=str, data=Verifierid):
+    def add_file(name=str, uri=URI):
         return None
 
     def remove(name=str):
@@ -75,7 +76,7 @@ class RIMutableDirectoryNode(RemoteInterface):
 
 
 class ICodecEncoder(Interface):
-    def set_params(data_size, required_shares, total_shares):
+    def set_params(data_size, required_shares, max_shares):
         """Set up the parameters of this encoder.
 
         See encode() for a description of how these parameters are used.
@@ -109,28 +110,58 @@ class ICodecEncoder(Interface):
         """Return the length of the shares that encode() will produce.
         """
 
-    def encode(data):
+    def encode(data, num_shares=None):
         """Encode a chunk of data. This may be called multiple times. Each
         call is independent.
 
         The data must be a string with a length that exactly matches the
         data_size promised by set_params().
 
+        'num_shares', if provided, must be equal or less than the
+        'max_shares' set in set_params. If 'num_shares' is left at None, this
+        method will produce 'max_shares' shares. This can be used to minimize
+        the work that the encoder needs to do if we initially thought that we
+        would need, say, 100 shares, but now that it is time to actually
+        encode the data we only have 75 peers to send data to.
+
         For each call, encode() will return a Deferred that fires with a list
         of 'total_shares' tuples. Each tuple is of the form (sharenum,
-        share), where sharenum is an int (from 0 total_shares-1), and share
-        is a string. The get_share_size() method can be used to determine the
-        length of the 'share' strings returned by encode().
+        sharedata), where sharenum is an int (from 0 total_shares-1), and
+        sharedata is a string. The get_share_size() method can be used to
+        determine the length of the 'sharedata' strings returned by encode().
+
+        The (sharenum, sharedata) tuple must be kept together during storage
+        and retrieval. Specifically, the share data is useless by itself: the
+        decoder needs to be told which share is which by providing it with
+        both the share number and the actual share data.
 
         The memory usage of this function is expected to be on the order of
         total_shares * get_share_size().
         """
+        # design note: we could embed the share number in the sharedata by
+        # returning bencode((sharenum,sharedata)). The advantage would be
+        # making it easier to keep these two pieces together, and probably
+        # avoiding a round trip when reading the remote bucket (although this
+        # could be achieved by changing RIBucketReader.read to
+        # read_data_and_metadata). The disadvantage is that the share number
+        # wants to be exposed to the storage/bucket layer (specifically to
+        # handle the next stage of peer-selection algorithm in which we
+        # propose to keep share#A on a given peer and they are allowed to
+        # tell us that they already have share#B). Also doing this would make
+        # the share size somewhat variable (one-digit sharenumbers will be a
+        # byte shorter than two-digit sharenumbers), unless we zero-pad the
+        # sharenumbers based upon the max_total_shares declared in
+        # set_params.
 
 class ICodecDecoder(Interface):
     def set_serialized_params(params):
         """Set up the parameters of this encoder, from a string returned by
         encoder.get_serialized_params()."""
 
+    def get_required_shares():
+        """Return the number of shares needed to reconstruct the data.
+        set_serialized_params() must be called before this."""
+
     def decode(some_shares):
         """Decode a partial list of shares into data.
 
index c067c303ce96304392c898a20dd1849bf7f3bf0f..5cd01e06ad75f47c8f6111cfea62067f086dc6a2 100644 (file)
@@ -17,3 +17,8 @@ class Basic(unittest.TestCase):
         self.failUnlessEqual(c.permute_peerids("two"), ['0','4','2','1','3'])
         c.all_peers = []
         self.failUnlessEqual(c.permute_peerids("one"), [])
+
+        c2 = client.Client("")
+        c2.all_peers = ["%d" % i for i in range(5)]
+        self.failUnlessEqual(c2.permute_peerids("one"), ['3','1','0','4','2'])
+
index cec61bb312ba374f791bf82b25e9e4effbf052d1..d073ae8c113aa8d281a414d9bd5e73076d4e9f39 100644 (file)
@@ -10,17 +10,23 @@ class Tester:
     #enc_class = PyRSEncoder
     #dec_class = PyRSDecoder
 
-    def do_test(self, size, required_shares, total_shares):
+    def do_test(self, size, required_shares, max_shares, fewer_shares=None):
         data0 = os.urandom(size)
         enc = self.enc_class()
-        enc.set_params(size, required_shares, total_shares)
+        enc.set_params(size, required_shares, max_shares)
         serialized_params = enc.get_serialized_params()
         log.msg("serialized_params: %s" % serialized_params)
         d = enc.encode(data0)
-        def _done(shares):
-            self.failUnlessEqual(len(shares), total_shares)
+        def _done_encoding_all(shares):
+            self.failUnlessEqual(len(shares), max_shares)
             self.shares = shares
-        d.addCallback(_done)
+        d.addCallback(_done_encoding_all)
+        if fewer_shares is not None:
+            # also validate that the num_shares= parameter works
+            d.addCallback(lambda res: enc.encode(data0, fewer_shares))
+            def _check_fewer_shares(some_shares):
+                self.failUnlessEqual(len(some_shares), fewer_shares)
+            d.addCallback(_check_fewer_shares)
 
         def _decode(shares):
             dec = self.dec_class()
@@ -91,7 +97,7 @@ class Tester:
     def test_encode2(self):
         if os.uname()[1] == "slave3" and self.enc_class == PyRSEncoder:
             raise unittest.SkipTest("slave3 is really slow")
-        return self.do_test(123, 25, 100)
+        return self.do_test(123, 25, 100, 90)
 
     def test_sizes(self):
         raise unittest.SkipTest("omg this would take forever")
@@ -114,13 +120,13 @@ class BenchPyRS(unittest.TestCase):
     def test_big(self):
         size = 10000
         required_shares = 25
-        total_shares = 100
+        max_shares = 100
         # this lets us use a persistent lookup table, stored outside the
         # _trial_temp directory (which is deleted each time trial is run)
         os.symlink("../ffield.lut.8", "ffield.lut.8")
         enc = self.enc_class()
         self.start()
-        enc.set_params(size, required_shares, total_shares)
+        enc.set_params(size, required_shares, max_shares)
         serialized_params = enc.get_serialized_params()
         print "encoder ready", self.stop()
         self.start()
@@ -132,7 +138,7 @@ class BenchPyRS(unittest.TestCase):
             now_shares = time.time()
             print "shares ready", self.stop()
             self.start()
-            self.failUnlessEqual(len(shares), total_shares)
+            self.failUnlessEqual(len(shares), max_shares)
         d.addCallback(_done)
         d.addCallback(lambda res: enc.encode(data0))
         d.addCallback(_done)
index 6c43460c61d82b7fe9973488fb8951efc2de66ce..5ca474e29ea7783a6a9b6b8fb1ef31fa20ff3b2a 100644 (file)
@@ -116,16 +116,20 @@ class SystemTest(unittest.TestCase):
             d1 = u.upload_data(DATA)
             return d1
         d.addCallback(_do_upload)
-        def _upload_done(verifierid):
-            log.msg("upload finished: verifierid=%s" % idlib.b2a(verifierid))
+        def _upload_done(uri):
+            log.msg("upload finished: uri is %s" % (uri,))
             dl = self.clients[1].getServiceNamed("downloader")
-            d1 = dl.download_to_data(verifierid)
+            d1 = dl.download_to_data(uri)
             return d1
         d.addCallback(_upload_done)
         def _download_done(data):
             log.msg("download finished")
             self.failUnlessEqual(data, DATA)
         d.addCallback(_download_done)
+        def _oops(res):
+            log.msg("oops, an error orccurred, finishing: %s" % res)
+            return res
+        d.addErrback(_oops)
         return d
     test_upload_and_download.timeout = 20
 
index 9d3e44c8524bebd2e25da9c8b04b8f8a8ea0993f..d0f74560e6f532d7baaedf75c30bbf46d6477797 100644 (file)
@@ -1,9 +1,10 @@
 
 from twisted.trial import unittest
 from twisted.internet import defer
+from twisted.application import service
 from cStringIO import StringIO
 
-from allmydata import upload
+from allmydata import upload, download
 
 class StringBucketProxy:
     # This is for unit tests: make a StringIO look like a RIBucketWriter.
@@ -64,8 +65,10 @@ class FakeClient:
             return defer.fail(IndexError("no connection to that peer"))
         return defer.succeed(peer)
 
+
 class NextPeerUploader(upload.FileUploader):
-    def _got_all_peers(self, res):
+    _size = 100
+    def _got_enough_peers(self, res):
         return res
 
 class NextPeer(unittest.TestCase):
@@ -81,12 +84,12 @@ class NextPeer(unittest.TestCase):
                for peerid, bucketnum in expected]
         self.failUnlessEqual(u.landlords, exp)
 
+    VERIFIERID = "\x00" * 20
     def test_0(self):
         c = FakeClient([])
         u = NextPeerUploader(c)
-        u._verifierid = "verifierid"
-        u._shares = 2
-        u._share_size = 100
+        u.set_verifierid(self.VERIFIERID)
+        u.set_params(2, 2, 2)
         d = u.start()
         def _check(f):
             f.trap(upload.NotEnoughPeersError)
@@ -97,9 +100,8 @@ class NextPeer(unittest.TestCase):
     def test_1(self):
         c = FakeClient(self.responses)
         u = NextPeerUploader(c)
-        u._verifierid = "verifierid"
-        u._shares = 2
-        u._share_size = 100
+        u.set_verifierid(self.VERIFIERID)
+        u.set_params(2, 2, 2)
         d = u.start()
         def _check(res):
             self.failUnlessEqual(u.goodness_points, 2)
@@ -112,9 +114,8 @@ class NextPeer(unittest.TestCase):
     def test_2(self):
         c = FakeClient(self.responses)
         u = NextPeerUploader(c)
-        u._verifierid = "verifierid"
-        u._shares = 3
-        u._share_size = 100
+        u.set_verifierid(self.VERIFIERID)
+        u.set_params(3, 3, 3)
         d = u.start()
         def _check(res):
             self.failUnlessEqual(u.goodness_points, 3)
@@ -135,9 +136,8 @@ class NextPeer(unittest.TestCase):
     def test_3(self):
         c = FakeClient(self.responses2)
         u = NextPeerUploader(c)
-        u._verifierid = "verifierid"
-        u._shares = 3
-        u._share_size = 100
+        u.set_verifierid(self.VERIFIERID)
+        u.set_params(3, 3, 3)
         d = u.start()
         def _check(res):
             self.failUnlessEqual(u.goodness_points, 3)
@@ -158,9 +158,8 @@ class NextPeer(unittest.TestCase):
     def test_4(self):
         c = FakeClient(self.responses3)
         u = NextPeerUploader(c)
-        u._verifierid = "verifierid"
-        u._shares = 4
-        u._share_size = 100
+        u.set_verifierid(self.VERIFIERID)
+        u.set_params(4, 4, 4)
         d = u.start()
         def _check(res):
             self.failUnlessEqual(u.goodness_points, 4)
@@ -171,3 +170,88 @@ class NextPeer(unittest.TestCase):
                                           ])
         d.addCallback(_check)
         return d
+
+
+class FakePeer2:
+    def __init__(self, peerid):
+        self.peerid = peerid
+        self.data = ""
+
+    def callRemote(self, methname, *args, **kwargs):
+        if methname == "allocate_bucket":
+            return defer.maybeDeferred(self._allocate_bucket, *args, **kwargs)
+        if methname == "write":
+            return defer.maybeDeferred(self._write, *args, **kwargs)
+        if methname == "set_metadata":
+            return defer.maybeDeferred(self._set_metadata, *args, **kwargs)
+        if methname == "close":
+            return defer.maybeDeferred(self._close, *args, **kwargs)
+        return defer.maybeDeferred(self._bad_name, methname)
+
+    def _allocate_bucket(self, verifierid, bucket_num, size, leaser, canary):
+        self.allocated_size = size
+        return self
+    def _write(self, data):
+        self.data = self.data + data
+    def _set_metadata(self, metadata):
+        self.metadata = metadata
+    def _close(self):
+        pass
+    def _bad_name(self, methname):
+        raise NameError("FakePeer2 has no such method named '%s'" % methname)
+
+class FakeClient2:
+    nodeid = "fakeclient"
+    def __init__(self, max_peers):
+        self.peers = []
+        for peerid in range(max_peers):
+            self.peers.append(FakePeer2(str(peerid)))
+
+    def permute_peerids(self, key, max_peers):
+        assert max_peers == None
+        return [str(i) for i in range(len(self.peers))]
+
+    def get_remote_service(self, peerid, name):
+        peer = self.peers[int(peerid)]
+        if not peer:
+            return defer.fail(IndexError("no connection to that peer"))
+        return defer.succeed(peer)
+
+class Uploader(unittest.TestCase):
+    def setUp(self):
+        node = self.node = FakeClient2(10)
+        u = self.u = upload.Uploader()
+        u.running = 1
+        u.parent = node
+
+    def _check(self, uri):
+        self.failUnless(isinstance(uri, str))
+        self.failUnless(uri.startswith("URI:"))
+        verifierid, params = download.unpack_uri(uri)
+        self.failUnless(isinstance(verifierid, str))
+        self.failUnlessEqual(len(verifierid), 20)
+        self.failUnless(isinstance(params, str))
+        peers = self.node.peers
+        self.failUnlessEqual(peers[0].allocated_size,
+                             len(peers[0].data))
+    def testData(self):
+        data = "This is some data to upload"
+        d = self.u.upload_data(data)
+        d.addCallback(self._check)
+        return d
+
+    def testFileHandle(self):
+        data = "This is some data to upload"
+        d = self.u.upload_filehandle(StringIO(data))
+        d.addCallback(self._check)
+        return d
+
+    def testFilename(self):
+        fn = "Uploader-testFilename.data"
+        f = open(fn, "w")
+        data = "This is some data to upload"
+        f.write(data)
+        f.close()
+        d = self.u.upload_filename(fn)
+        d.addCallback(self._check)
+        return d
index 28908318f2c3ba59385cbfe82c66aea7efd2435d..5121bc930a08afe69d9172be881b535f221d76a1 100644 (file)
@@ -5,7 +5,8 @@ from twisted.internet import defer
 from twisted.application import service
 from foolscap import Referenceable
 
-from allmydata.util import idlib
+from allmydata.util import idlib, bencode
+from allmydata.util.deferredutil import DeferredListShouldSucceed
 from allmydata import codec
 
 from cStringIO import StringIO
@@ -22,54 +23,84 @@ class HaveAllPeersError(Exception):
 class TooFullError(Exception):
     pass
 
+
 class FileUploader:
     debug = False
 
     def __init__(self, peer):
         self._peer = peer
 
+    def set_params(self, min_shares, target_goodness, max_shares):
+        self.min_shares = min_shares
+        self.target_goodness = target_goodness
+        self.max_shares = max_shares
+
     def set_filehandle(self, filehandle):
         self._filehandle = filehandle
         filehandle.seek(0, 2)
         self._size = filehandle.tell()
         filehandle.seek(0)
 
-    def make_encoder(self):
-        self._needed_shares = 4
-        self._shares = 4
-        self._encoder = codec.Encoder(self._filehandle, self._shares)
-        self._share_size = self._size
-
     def set_verifierid(self, vid):
         assert isinstance(vid, str)
+        assert len(vid) == 20
         self._verifierid = vid
 
 
     def start(self):
-        log.msg("starting upload")
+        """Start uploading the file.
+
+        The source of the data to be uploaded must have been set before this
+        point by calling set_filehandle().
+
+        This method returns a Deferred that will fire with the URI (a
+        string)."""
+
+        log.msg("starting upload [%s]" % (idlib.b2a(self._verifierid),))
         if self.debug:
             print "starting upload"
+        assert self.min_shares
+        assert self.target_goodness
+
+        # create the encoder, so we can know how large the shares will be
+        total_shares = self.max_shares
+        needed_shares = self.min_shares
+        self._encoder = codec.ReplicatingEncoder()
+        self._encoder.set_params(self._size, needed_shares, total_shares)
+        self._share_size = self._encoder.get_share_size()
+
         # first step: who should we upload to?
 
-        # maybe limit max_peers to 2*len(self.shares), to reduce memory
-        # footprint
+        # We will talk to at most max_peers (which can be None to mean no
+        # limit). Maybe limit max_peers to 2*len(self.shares), to reduce
+        # memory footprint. For now, make it unlimited.
         max_peers = None
 
         self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
+
         self._total_peers = len(self.permuted)
         for p in self.permuted:
             assert isinstance(p, str)
         # we will shrink self.permuted as we give up on peers
         self.peer_index = 0
         self.goodness_points = 0
-        self.target_goodness = self._shares
         self.landlords = [] # list of (peerid, bucket_num, remotebucket)
 
         d = defer.maybeDeferred(self._check_next_peer)
-        d.addCallback(self._got_all_peers)
+        d.addCallback(self._got_enough_peers)
+        d.addCallback(self._compute_uri)
         return d
 
+    def _compute_uri(self, params):
+        return "URI:%s" % bencode.bencode((self._verifierid, params))
+
     def _check_next_peer(self):
+        if self.debug:
+            log.msg("FileUploader._check_next_peer: %d permuted, %d goodness"
+                    " (want %d), have %d landlords, %d total peers" %
+                    (len(self.permuted), self.goodness_points,
+                     self.target_goodness, len(self.landlords),
+                     self._total_peers))
         if len(self.permuted) == 0:
             # there are no more to check
             raise NotEnoughPeersError("%s goodness, want %s, have %d "
@@ -98,7 +129,8 @@ class FileUploader:
                     print " peerid %s will grant us a lease" % idlib.b2a(peerid)
                 self.landlords.append( (peerid, bucket_num, bucket) )
                 self.goodness_points += 1
-                if self.goodness_points >= self.target_goodness:
+                if (self.goodness_points >= self.target_goodness and
+                    len(self.landlords) >= self.min_shares):
                     if self.debug: print " we're done!"
                     raise HaveAllPeersError()
                 # otherwise we fall through to allocate more peers
@@ -129,11 +161,52 @@ class FileUploader:
         d.addBoth(_done_with_peer)
         return d
 
-    def _got_all_peers(self, res):
-        d = self._encoder.do_upload(self.landlords)
-        d.addCallback(lambda res: self._verifierid)
+    def _got_enough_peers(self, res):
+        landlords = self.landlords
+        if self.debug:
+            log.msg("FileUploader._got_enough_peers")
+            log.msg(" %d landlords" % len(landlords))
+            if len(landlords) < 20:
+                log.msg(" peerids: %s" % " ".join([idlib.b2a(l[0])
+                                                   for l in landlords]))
+                log.msg(" buckets: %s" % " ".join([str(l[1])
+                                                   for l in landlords]))
+        # assign shares to landlords
+        self.sharemap = {}
+        for peerid, bucket_num, bucket in landlords:
+            self.sharemap[bucket_num] = bucket
+        # the sharemap should have exactly len(landlords) shares, with
+        # no holes
+        assert sorted(self.sharemap.keys()) == range(len(landlords))
+        # encode all the data at once: this class does not use segmentation
+        data = self._filehandle.read()
+        d = self._encoder.encode(data, len(landlords))
+        d.addCallback(self._send_all_shares)
+        d.addCallback(lambda res: self._encoder.get_serialized_params())
+        return d
+
+    def _send_one_share(self, bucket, sharedata, metadata):
+        d = bucket.callRemote("write", sharedata)
+        d.addCallback(lambda res:
+                      bucket.callRemote("set_metadata", metadata))
+        d.addCallback(lambda res:
+                      bucket.callRemote("close"))
         return d
 
+    def _send_all_shares(self, shares):
+        dl = []
+        for share in shares:
+            (sharenum,sharedata) = share
+            if self.debug:
+                log.msg(" writing share %d" % sharenum)
+            metadata = bencode.bencode(sharenum)
+            assert len(sharedata) == self._share_size
+            assert isinstance(sharedata, str)
+            bucket = self.sharemap[sharenum]
+            d = self._send_one_share(bucket, sharedata, metadata)
+            dl.append(d)
+        return DeferredListShouldSucceed(dl)
+
 def netstring(s):
     return "%d:%s," % (len(s), s)
 
@@ -175,24 +248,35 @@ class Uploader(service.MultiService):
     """I am a service that allows file uploading.
     """
     name = "uploader"
+    uploader_class = FileUploader
+    debug = False
 
     def _compute_verifierid(self, f):
         hasher = sha.new(netstring("allmydata_v1_verifierid"))
         f.seek(0)
-        hasher.update(f.read())
+        data = f.read()
+        hasher.update(data)#f.read())
         f.seek(0)
         # note: this is only of the plaintext data, no encryption yet
         return hasher.digest()
 
     def upload(self, f):
+        # this returns (verifierid, encoding_params)
         assert self.parent
         assert self.running
         f = IUploadable(f)
         fh = f.get_filehandle()
-        u = FileUploader(self.parent)
+        u = self.uploader_class(self.parent)
+        if self.debug:
+            u.debug = True
         u.set_filehandle(fh)
+        # TODO: change this to (2,2,4) once Foolscap is fixed to allow
+        # connect-to-self and Client is fixed to include ourselves in the
+        # peerlist. Otherwise this usually fails because we give a share to
+        # the eventual downloader, and they won't try to get a share from
+        # themselves.
+        u.set_params(2, 3, 4)
         u.set_verifierid(self._compute_verifierid(fh))
-        u.make_encoder()
         d = u.start()
         def _done(res):
             f.close_filehandle(fh)
diff --git a/src/allmydata/util/deferredutil.py b/src/allmydata/util/deferredutil.py
new file mode 100644 (file)
index 0000000..ca62e5a
--- /dev/null
@@ -0,0 +1,17 @@
+
+from twisted.internet import defer
+
+# utility wrapper for DeferredList
+def _check_deferred_list(results):
+    # if any of the component Deferreds failed, return the first failure such
+    # that an addErrback() would fire. If all were ok, return a list of the
+    # results (without the success/failure booleans)
+    for success,f in results:
+        if not success:
+            return f
+    return [r[1] for r in results]
+def DeferredListShouldSucceed(dl):
+    d = defer.DeferredList(dl)
+    d.addCallback(_check_deferred_list)
+    return d
+
index b447a08d996b4e734132ce3599fb0c30f6218e5b..cd6882db674f2d34425ca06804cb33e257cf96cd 100644 (file)
@@ -1,7 +1,14 @@
 from base64 import b32encode, b32decode
 
 def b2a(i):
+    assert isinstance(i, str), "tried to idlib.b2a non-string '%s'" % (i,)
     return b32encode(i).lower()
 
 def a2b(i):
-    return b32decode(i.upper())
+    assert isinstance(i, str), "tried to idlib.a2b non-string '%s'" % (i,)
+    try:
+        return b32decode(i.upper())
+    except TypeError:
+        print "b32decode failed on a %s byte string '%s'" % (len(i), i)
+        raise
+
index 22e012d08888cdaaac74a4eb1e31aca2ecd30371..bbb3eda5240b88eeedbda92dfba2720fd0c742e4 100644 (file)
@@ -84,8 +84,9 @@ class VDrive(service.MultiService):
         d = self.dirpath(dir_or_path)
         def _got_dir(dirnode):
             d1 = ul.upload(uploadable)
-            d1.addCallback(lambda vid:
-                           dirnode.callRemote("add_file", name, vid))
+            def _add(uri):
+                return dirnode.callRemote("add_file", name, uri)
+            d1.addCallback(_add)
             return d1
         d.addCallback(_got_dir)
         def _done(res):