From 9fdc74165769fdb02dc643a88a7b4c4c9b7e2e39 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Sun, 3 Dec 2006 03:01:43 -0700 Subject: [PATCH] implement/test download, modify Storage to match --- allmydata/bucketstore.py | 8 +- allmydata/client.py | 2 + allmydata/download.py | 145 +++++++++++++++++++++++++++++++++ allmydata/encode.py | 34 ++++++++ allmydata/interfaces.py | 4 +- allmydata/storageserver.py | 4 +- allmydata/test/test_storage.py | 21 ++--- allmydata/test/test_system.py | 26 ++++-- allmydata/upload.py | 1 + 9 files changed, 217 insertions(+), 28 deletions(-) create mode 100644 allmydata/download.py diff --git a/allmydata/bucketstore.py b/allmydata/bucketstore.py index ddd7cfc9..01b3bdd6 100644 --- a/allmydata/bucketstore.py +++ b/allmydata/bucketstore.py @@ -37,13 +37,15 @@ class BucketStore(service.MultiService, Referenceable): self._leases.add(lease) return lease - def get_bucket(self, verifierid): + def get_buckets(self, verifierid): # for now, only returns those created by this process, in this run bucket_dir = self._get_bucket_dir(verifierid) if os.path.exists(bucket_dir): - return BucketReader(ReadBucket(bucket_dir, verifierid)) + b = ReadBucket(bucket_dir, verifierid) + br = BucketReader(b) + return [(b.get_bucket_num(), br)] else: - return NoSuchBucketError() + return [] class Lease(Referenceable): implements(RIBucketWriter) diff --git a/allmydata/client.py b/allmydata/client.py index a8608722..81473afb 100644 --- a/allmydata/client.py +++ b/allmydata/client.py @@ -11,6 +11,7 @@ from twisted.internet import defer from allmydata.storageserver import StorageServer from allmydata.upload import Uploader +from allmydata.download import Downloader from allmydata.util import idlib class Client(node.Node, Referenceable): @@ -27,6 +28,7 @@ class Client(node.Node, Referenceable): self.connections = {} self.add_service(StorageServer(os.path.join(basedir, self.STOREDIR))) self.add_service(Uploader()) + self.add_service(Downloader()) self.queen_pburl = None self.queen_connector = None diff --git a/allmydata/download.py b/allmydata/download.py new file mode 100644 index 00000000..ecdaae5d --- /dev/null +++ b/allmydata/download.py @@ -0,0 +1,145 @@ + +from twisted.python import failure, log +from twisted.internet import defer +from twisted.application import service + +from allmydata.util import idlib +from allmydata import encode + +from cStringIO import StringIO + +class NotEnoughPeersError(Exception): + pass + +class HaveAllPeersError(Exception): + # we use this to jump out of the loop + pass + +class FileDownloader: + debug = False + + def __init__(self, peer, verifierid): + self._peer = peer + assert isinstance(verifierid, str) + self._verifierid = verifierid + + def set_filehandle(self, filehandle): + self._filehandle = filehandle + + def make_decoder(self): + n = self._shares = 4 + k = self._desired_shares = 2 + self._decoder = encode.Decoder(self._filehandle, k, n, + self._verifierid) + + def start(self): + log.msg("starting download") + if self.debug: + print "starting download" + # first step: who should we download from? + + # maybe limit max_peers to 2*len(self.shares), to reduce memory + # footprint + max_peers = None + + self.permuted = self._peer.permute_peerids(self._verifierid, max_peers) + for p in self.permuted: + assert isinstance(p, str) + self.landlords = [] # list of (peerid, bucket_num, remotebucket) + + d = defer.maybeDeferred(self._check_next_peer) + d.addCallback(self._got_all_peers) + return d + + def _check_next_peer(self): + if len(self.permuted) == 0: + # there are no more to check + raise NotEnoughPeersError + peerid = self.permuted.pop(0) + + d = self._peer.get_remote_service(peerid, "storageserver") + def _got_peer(service): + bucket_num = len(self.landlords) + if self.debug: print "asking %s" % idlib.b2a(peerid) + d2 = service.callRemote("get_buckets", verifierid=self._verifierid) + def _got_response(buckets): + if buckets: + bucket_nums = [num for (num,bucket) in buckets] + if self.debug: + print " peerid %s has buckets %s" % (idlib.b2a(peerid), + bucket_nums) + + self.landlords.append( (peerid, buckets) ) + if len(self.landlords) >= self._desired_shares: + if self.debug: print " we're done!" + raise HaveAllPeersError + # otherwise we fall through to search more peers + d2.addCallback(_got_response) + return d2 + d.addCallback(_got_peer) + + def _done_with_peer(res): + if self.debug: print "done with peer %s:" % idlib.b2a(peerid) + if isinstance(res, failure.Failure): + if res.check(HaveAllPeersError): + if self.debug: print " all done" + # we're done! + return + if res.check(IndexError): + if self.debug: print " no connection" + else: + if self.debug: print " other error:", res + else: + if self.debug: print " they had data for us" + # we get here for either good peers (when we still need more), or + # after checking a bad peer (and thus still need more). So now we + # need to grab a new peer. + return self._check_next_peer() + d.addBoth(_done_with_peer) + return d + + def _got_all_peers(self, res): + all_buckets = [] + for peerid, buckets in self.landlords: + all_buckets.extend(buckets) + d = self._decoder.start(all_buckets) + return d + +def netstring(s): + return "%d:%s," % (len(s), s) + +class Downloader(service.MultiService): + """I am a service that allows file downloading. + """ + name = "downloader" + + def download_to_filename(self, verifierid, filename): + f = open(filename, "wb") + def _done(res): + f.close() + return res + d = self.download_filehandle(verifierid, f) + d.addBoth(_done) + return d + + def download_to_data(self, verifierid): + f = StringIO() + d = self.download_filehandle(verifierid, f) + def _done(res): + return f.getvalue() + d.addCallback(_done) + return d + + def download_filehandle(self, verifierid, f): + assert self.parent + assert self.running + assert isinstance(verifierid, str) + assert f.write + assert f.close + dl = FileDownloader(self.parent, verifierid) + dl.set_filehandle(f) + dl.make_decoder() + d = dl.start() + return d + + diff --git a/allmydata/encode.py b/allmydata/encode.py index a9b3d475..ea54fbd2 100644 --- a/allmydata/encode.py +++ b/allmydata/encode.py @@ -1,4 +1,9 @@ from twisted.internet import defer +import sha +from allmydata.util import idlib + +def netstring(s): + return "%d:%s," % (len(s), s) class Encoder(object): def __init__(self, infile, m): @@ -14,3 +19,32 @@ class Encoder(object): dl.append(remotebucket.callRemote('close')) return defer.DeferredList(dl) + +class Decoder(object): + def __init__(self, outfile, k, m, verifierid): + self.outfile = outfile + self.k = 2 + self.m = m + self._verifierid = verifierid + + def start(self, buckets): + assert len(buckets) >= self.k + dl = [] + for bucketnum, bucket in buckets[:self.k]: + d = bucket.callRemote("read") + dl.append(d) + d2 = defer.DeferredList(dl) + d2.addCallback(self._got_all_data) + return d2 + + def _got_all_data(self, resultslist): + shares = [results for success,results in resultslist if success] + assert len(shares) >= self.k + # here's where the Reed-Solomon magic takes place + self.outfile.write(shares[0]) + hasher = sha.new(netstring("allmydata_v1_verifierid")) + hasher.update(shares[0]) + vid = hasher.digest() + if self._verifierid: + assert self._verifierid == vid, "%s != %s" % (idlib.b2a(self._verifierid), idlib.b2a(vid)) + diff --git a/allmydata/interfaces.py b/allmydata/interfaces.py index 10899855..97866b9d 100644 --- a/allmydata/interfaces.py +++ b/allmydata/interfaces.py @@ -29,8 +29,8 @@ class RIStorageServer(RemoteInterface): def allocate_bucket(verifierid=Verifierid, bucket_num=int, size=int, leaser=Nodeid): return RIBucketWriter_ - def get_bucket(verifierid=Verifierid): - return RIBucketReader_ + def get_buckets(verifierid=Verifierid): + return ListOf(TupleOf(int, RIBucketReader_)) class RIBucketWriter(RemoteInterface): def write(data=ShareData): diff --git a/allmydata/storageserver.py b/allmydata/storageserver.py index 8bd25142..2d2ef79e 100644 --- a/allmydata/storageserver.py +++ b/allmydata/storageserver.py @@ -29,5 +29,5 @@ class StorageServer(service.MultiService, Referenceable): idlib.b2a(leaser)) return lease - def remote_get_bucket(self, verifierid): - return self._bucketstore.get_bucket(verifierid) + def remote_get_buckets(self, verifierid): + return self._bucketstore.get_buckets(verifierid) diff --git a/allmydata/test/test_storage.py b/allmydata/test/test_storage.py index 05ecb22c..69485080 100644 --- a/allmydata/test/test_storage.py +++ b/allmydata/test/test_storage.py @@ -62,24 +62,21 @@ class StorageTest(unittest.TestCase): rssd.addCallback(get_node_again) rssd.addCallback(get_storageserver) - def get_bucket(storageserver): - return storageserver.callRemote('get_bucket', verifierid=vid) - rssd.addCallback(get_bucket) + def get_buckets(storageserver): + return storageserver.callRemote('get_buckets', verifierid=vid) + rssd.addCallback(get_buckets) + + def read_buckets(buckets): + self.failUnlessEqual(len(buckets), 1) + bucket_num, bucket = buckets[0] + self.failUnlessEqual(bucket_num, bnum) - def read_bucket(bucket): def check_data(bytes_read): self.failUnlessEqual(bytes_read, data) d = bucket.callRemote('read') d.addCallback(check_data) - - def get_bucket_num(junk): - return bucket.callRemote('get_bucket_num') - d.addCallback(get_bucket_num) - def check_bucket_num(bucket_num): - self.failUnlessEqual(bucket_num, bnum) - d.addCallback(check_bucket_num) return d - rssd.addCallback(read_bucket) + rssd.addCallback(read_buckets) return rssd diff --git a/allmydata/test/test_system.py b/allmydata/test/test_system.py index e04ce620..5f934fe6 100644 --- a/allmydata/test/test_system.py +++ b/allmydata/test/test_system.py @@ -6,6 +6,7 @@ from allmydata import client, queen import os from foolscap.eventual import flushEventualQueue from twisted.python import log +from allmydata.util import idlib class SystemTest(unittest.TestCase): def setUp(self): @@ -60,18 +61,25 @@ class SystemTest(unittest.TestCase): return d test_connections.timeout = 20 - def test_upload(self): + def test_upload_and_download(self): + DATA = "Some data to upload\n" d = self.set_up_nodes() - def _upload(res): + def _do_upload(res): log.msg("UPLOADING") u = self.clients[0].getServiceNamed("uploader") - d1 = u.upload_data("Some data to upload\n") + d1 = u.upload_data(DATA) return d1 - d.addCallback(_upload) - def _done(res): - log.msg("DONE") - print "upload finished" - d.addCallback(_done) + d.addCallback(_do_upload) + def _upload_done(verifierid): + log.msg("upload finished: verifierid=%s" % idlib.b2a(verifierid)) + dl = self.clients[1].getServiceNamed("downloader") + d1 = dl.download_to_data(verifierid) + return d1 + d.addCallback(_upload_done) + def _download_done(data): + log.msg("download finished") + self.failUnlessEqual(data, DATA) + d.addCallback(_download_done) return d - test_upload.timeout = 20 + test_upload_and_download.timeout = 20 diff --git a/allmydata/upload.py b/allmydata/upload.py index 3205367e..6002cff2 100644 --- a/allmydata/upload.py +++ b/allmydata/upload.py @@ -121,6 +121,7 @@ class FileUploader: def _got_all_peers(self, res): d = self._encoder.do_upload(self.landlords) + d.addCallback(lambda res: self._verifierid) return d def netstring(s): -- 2.45.2