implement/test download, modify Storage to match
authorBrian Warner <warner@lothar.com>
Sun, 3 Dec 2006 10:01:43 +0000 (03:01 -0700)
committerBrian Warner <warner@lothar.com>
Sun, 3 Dec 2006 10:01:43 +0000 (03:01 -0700)
allmydata/bucketstore.py
allmydata/client.py
allmydata/download.py [new file with mode: 0644]
allmydata/encode.py
allmydata/interfaces.py
allmydata/storageserver.py
allmydata/test/test_storage.py
allmydata/test/test_system.py
allmydata/upload.py

index ddd7cfc922425e7db71b11978c7815b6c319ae81..01b3bdd6252f812220711e8ab483739f1fee019f 100644 (file)
@@ -37,13 +37,15 @@ class BucketStore(service.MultiService, Referenceable):
         self._leases.add(lease)
         return lease
 
-    def get_bucket(self, verifierid):
+    def get_buckets(self, verifierid):
         # for now, only returns those created by this process, in this run
         bucket_dir = self._get_bucket_dir(verifierid)
         if os.path.exists(bucket_dir):
-            return BucketReader(ReadBucket(bucket_dir, verifierid))
+            b = ReadBucket(bucket_dir, verifierid)
+            br = BucketReader(b)
+            return [(b.get_bucket_num(), br)]
         else:
-            return NoSuchBucketError()
+            return []
 
 class Lease(Referenceable):
     implements(RIBucketWriter)
index a860872293fb0c3e1fb361adef29c76cd57a68f5..81473afb0fff73a16b23c485c2fab30be5163901 100644 (file)
@@ -11,6 +11,7 @@ from twisted.internet import defer
 
 from allmydata.storageserver import StorageServer
 from allmydata.upload import Uploader
+from allmydata.download import Downloader
 from allmydata.util import idlib
 
 class Client(node.Node, Referenceable):
@@ -27,6 +28,7 @@ class Client(node.Node, Referenceable):
         self.connections = {}
         self.add_service(StorageServer(os.path.join(basedir, self.STOREDIR)))
         self.add_service(Uploader())
+        self.add_service(Downloader())
         self.queen_pburl = None
         self.queen_connector = None
 
diff --git a/allmydata/download.py b/allmydata/download.py
new file mode 100644 (file)
index 0000000..ecdaae5
--- /dev/null
@@ -0,0 +1,145 @@
+
+from twisted.python import failure, log
+from twisted.internet import defer
+from twisted.application import service
+
+from allmydata.util import idlib
+from allmydata import encode
+
+from cStringIO import StringIO
+
+class NotEnoughPeersError(Exception):
+    pass
+
+class HaveAllPeersError(Exception):
+    # we use this to jump out of the loop
+    pass
+
+class FileDownloader:
+    debug = False
+
+    def __init__(self, peer, verifierid):
+        self._peer = peer
+        assert isinstance(verifierid, str)
+        self._verifierid = verifierid
+
+    def set_filehandle(self, filehandle):
+        self._filehandle = filehandle
+
+    def make_decoder(self):
+        n = self._shares = 4
+        k = self._desired_shares = 2
+        self._decoder = encode.Decoder(self._filehandle, k, n,
+                                       self._verifierid)
+
+    def start(self):
+        log.msg("starting download")
+        if self.debug:
+            print "starting download"
+        # first step: who should we download from?
+
+        # maybe limit max_peers to 2*len(self.shares), to reduce memory
+        # footprint
+        max_peers = None
+
+        self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
+        for p in self.permuted:
+            assert isinstance(p, str)
+        self.landlords = [] # list of (peerid, bucket_num, remotebucket)
+
+        d = defer.maybeDeferred(self._check_next_peer)
+        d.addCallback(self._got_all_peers)
+        return d
+
+    def _check_next_peer(self):
+        if len(self.permuted) == 0:
+            # there are no more to check
+            raise NotEnoughPeersError
+        peerid = self.permuted.pop(0)
+
+        d = self._peer.get_remote_service(peerid, "storageserver")
+        def _got_peer(service):
+            bucket_num = len(self.landlords)
+            if self.debug: print "asking %s" % idlib.b2a(peerid)
+            d2 = service.callRemote("get_buckets", verifierid=self._verifierid)
+            def _got_response(buckets):
+                if buckets:
+                    bucket_nums = [num for (num,bucket) in buckets]
+                    if self.debug:
+                        print " peerid %s has buckets %s" % (idlib.b2a(peerid),
+                                                             bucket_nums)
+
+                    self.landlords.append( (peerid, buckets) )
+                if len(self.landlords) >= self._desired_shares:
+                    if self.debug: print " we're done!"
+                    raise HaveAllPeersError
+                # otherwise we fall through to search more peers
+            d2.addCallback(_got_response)
+            return d2
+        d.addCallback(_got_peer)
+
+        def _done_with_peer(res):
+            if self.debug: print "done with peer %s:" % idlib.b2a(peerid)
+            if isinstance(res, failure.Failure):
+                if res.check(HaveAllPeersError):
+                    if self.debug: print " all done"
+                    # we're done!
+                    return
+                if res.check(IndexError):
+                    if self.debug: print " no connection"
+                else:
+                    if self.debug: print " other error:", res
+            else:
+                if self.debug: print " they had data for us"
+            # we get here for either good peers (when we still need more), or
+            # after checking a bad peer (and thus still need more). So now we
+            # need to grab a new peer.
+            return self._check_next_peer()
+        d.addBoth(_done_with_peer)
+        return d
+
+    def _got_all_peers(self, res):
+        all_buckets = []
+        for peerid, buckets in self.landlords:
+            all_buckets.extend(buckets)
+        d = self._decoder.start(all_buckets)
+        return d
+
+def netstring(s):
+    return "%d:%s," % (len(s), s)
+
+class Downloader(service.MultiService):
+    """I am a service that allows file downloading.
+    """
+    name = "downloader"
+
+    def download_to_filename(self, verifierid, filename):
+        f = open(filename, "wb")
+        def _done(res):
+            f.close()
+            return res
+        d = self.download_filehandle(verifierid, f)
+        d.addBoth(_done)
+        return d
+
+    def download_to_data(self, verifierid):
+        f = StringIO()
+        d = self.download_filehandle(verifierid, f)
+        def _done(res):
+            return f.getvalue()
+        d.addCallback(_done)
+        return d
+
+    def download_filehandle(self, verifierid, f):
+        assert self.parent
+        assert self.running
+        assert isinstance(verifierid, str)
+        assert f.write
+        assert f.close
+        dl = FileDownloader(self.parent, verifierid)
+        dl.set_filehandle(f)
+        dl.make_decoder()
+        d = dl.start()
+        return d
+
+
index a9b3d4759b7591dd9210ad72c8c514eac3ec1bc1..ea54fbd2bb9d79384a56e99c5d4cce9e6aabc322 100644 (file)
@@ -1,4 +1,9 @@
 from twisted.internet import defer
+import sha
+from allmydata.util import idlib
+
+def netstring(s):
+    return "%d:%s," % (len(s), s)
 
 class Encoder(object):
     def __init__(self, infile, m):
@@ -14,3 +19,32 @@ class Encoder(object):
             dl.append(remotebucket.callRemote('close'))
 
         return defer.DeferredList(dl)
+
+class Decoder(object):
+    def __init__(self, outfile, k, m, verifierid):
+        self.outfile = outfile
+        self.k = 2
+        self.m = m
+        self._verifierid = verifierid
+
+    def start(self, buckets):
+        assert len(buckets) >= self.k
+        dl = []
+        for bucketnum, bucket in buckets[:self.k]:
+            d = bucket.callRemote("read")
+            dl.append(d)
+        d2 = defer.DeferredList(dl)
+        d2.addCallback(self._got_all_data)
+        return d2
+
+    def _got_all_data(self, resultslist):
+        shares = [results for success,results in resultslist if success]
+        assert len(shares) >= self.k
+        # here's where the Reed-Solomon magic takes place
+        self.outfile.write(shares[0])
+        hasher = sha.new(netstring("allmydata_v1_verifierid"))
+        hasher.update(shares[0])
+        vid = hasher.digest()
+        if self._verifierid:
+            assert self._verifierid == vid, "%s != %s" % (idlib.b2a(self._verifierid), idlib.b2a(vid))
+
index 108998558626c124262aea288f83690cf0ef8ed6..97866b9db1d436d710d0aaf86225ea2deb2c608b 100644 (file)
@@ -29,8 +29,8 @@ class RIStorageServer(RemoteInterface):
     def allocate_bucket(verifierid=Verifierid, bucket_num=int, size=int,
                         leaser=Nodeid):
         return RIBucketWriter_
-    def get_bucket(verifierid=Verifierid):
-        return RIBucketReader_
+    def get_buckets(verifierid=Verifierid):
+        return ListOf(TupleOf(int, RIBucketReader_))
 
 class RIBucketWriter(RemoteInterface):
     def write(data=ShareData):
index 8bd2514256da4c0f0045d1bcb6582f9c2d3dbe6c..2d2ef79e6e2c66ae1ae0fa15e18831941a7d794c 100644 (file)
@@ -29,5 +29,5 @@ class StorageServer(service.MultiService, Referenceable):
                                                   idlib.b2a(leaser))
         return lease
 
-    def remote_get_bucket(self, verifierid):
-        return self._bucketstore.get_bucket(verifierid)
+    def remote_get_buckets(self, verifierid):
+        return self._bucketstore.get_buckets(verifierid)
index 05ecb22cb5a08a3a0dcaad61c9fbba54752a946b..69485080704a66e8cb544570545bc2446618569e 100644 (file)
@@ -62,24 +62,21 @@ class StorageTest(unittest.TestCase):
         rssd.addCallback(get_node_again)
         rssd.addCallback(get_storageserver)
 
-        def get_bucket(storageserver):
-            return storageserver.callRemote('get_bucket', verifierid=vid)
-        rssd.addCallback(get_bucket)
+        def get_buckets(storageserver):
+            return storageserver.callRemote('get_buckets', verifierid=vid)
+        rssd.addCallback(get_buckets)
+
+        def read_buckets(buckets):
+            self.failUnlessEqual(len(buckets), 1)
+            bucket_num, bucket = buckets[0]
+            self.failUnlessEqual(bucket_num, bnum)
 
-        def read_bucket(bucket):
             def check_data(bytes_read):
                 self.failUnlessEqual(bytes_read, data)
             d = bucket.callRemote('read')
             d.addCallback(check_data)
-
-            def get_bucket_num(junk):
-                return bucket.callRemote('get_bucket_num')
-            d.addCallback(get_bucket_num)
-            def check_bucket_num(bucket_num):
-                self.failUnlessEqual(bucket_num, bnum)
-            d.addCallback(check_bucket_num)
             return d
-        rssd.addCallback(read_bucket)
+        rssd.addCallback(read_buckets)
 
         return rssd
 
index e04ce620db553d35ef43f237fd43afa7ef628b2c..5f934fe6ac00f0107de7bfeb5854640fb0e57b79 100644 (file)
@@ -6,6 +6,7 @@ from allmydata import client, queen
 import os
 from foolscap.eventual import flushEventualQueue
 from twisted.python import log
+from allmydata.util import idlib
 
 class SystemTest(unittest.TestCase):
     def setUp(self):
@@ -60,18 +61,25 @@ class SystemTest(unittest.TestCase):
         return d
     test_connections.timeout = 20
 
-    def test_upload(self):
+    def test_upload_and_download(self):
+        DATA = "Some data to upload\n"
         d = self.set_up_nodes()
-        def _upload(res):
+        def _do_upload(res):
             log.msg("UPLOADING")
             u = self.clients[0].getServiceNamed("uploader")
-            d1 = u.upload_data("Some data to upload\n")
+            d1 = u.upload_data(DATA)
             return d1
-        d.addCallback(_upload)
-        def _done(res):
-            log.msg("DONE")
-            print "upload finished"
-        d.addCallback(_done)
+        d.addCallback(_do_upload)
+        def _upload_done(verifierid):
+            log.msg("upload finished: verifierid=%s" % idlib.b2a(verifierid))
+            dl = self.clients[1].getServiceNamed("downloader")
+            d1 = dl.download_to_data(verifierid)
+            return d1
+        d.addCallback(_upload_done)
+        def _download_done(data):
+            log.msg("download finished")
+            self.failUnlessEqual(data, DATA)
+        d.addCallback(_download_done)
         return d
-    test_upload.timeout = 20
+    test_upload_and_download.timeout = 20
 
index 3205367e23fc0a326eecf6efd961a4a00b26df60..6002cff2f3f4f453353344513f7c6cae58608be0 100644 (file)
@@ -121,6 +121,7 @@ class FileUploader:
 
     def _got_all_peers(self, res):
         d = self._encoder.do_upload(self.landlords)
+        d.addCallback(lambda res: self._verifierid)
         return d
 
 def netstring(s):