self._leases.add(lease)
return lease
- def get_bucket(self, verifierid):
+ def get_buckets(self, verifierid):
# for now, only returns those created by this process, in this run
bucket_dir = self._get_bucket_dir(verifierid)
if os.path.exists(bucket_dir):
- return BucketReader(ReadBucket(bucket_dir, verifierid))
+ b = ReadBucket(bucket_dir, verifierid)
+ br = BucketReader(b)
+ return [(b.get_bucket_num(), br)]
else:
- return NoSuchBucketError()
+ return []
class Lease(Referenceable):
implements(RIBucketWriter)
from allmydata.storageserver import StorageServer
from allmydata.upload import Uploader
+from allmydata.download import Downloader
from allmydata.util import idlib
class Client(node.Node, Referenceable):
self.connections = {}
self.add_service(StorageServer(os.path.join(basedir, self.STOREDIR)))
self.add_service(Uploader())
+ self.add_service(Downloader())
self.queen_pburl = None
self.queen_connector = None
--- /dev/null
+
+from twisted.python import failure, log
+from twisted.internet import defer
+from twisted.application import service
+
+from allmydata.util import idlib
+from allmydata import encode
+
+from cStringIO import StringIO
+
+class NotEnoughPeersError(Exception):
+ pass
+
+class HaveAllPeersError(Exception):
+ # we use this to jump out of the loop
+ pass
+
+class FileDownloader:
+ debug = False
+
+ def __init__(self, peer, verifierid):
+ self._peer = peer
+ assert isinstance(verifierid, str)
+ self._verifierid = verifierid
+
+ def set_filehandle(self, filehandle):
+ self._filehandle = filehandle
+
+ def make_decoder(self):
+ n = self._shares = 4
+ k = self._desired_shares = 2
+ self._decoder = encode.Decoder(self._filehandle, k, n,
+ self._verifierid)
+
+ def start(self):
+ log.msg("starting download")
+ if self.debug:
+ print "starting download"
+ # first step: who should we download from?
+
+ # maybe limit max_peers to 2*len(self.shares), to reduce memory
+ # footprint
+ max_peers = None
+
+ self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
+ for p in self.permuted:
+ assert isinstance(p, str)
+ self.landlords = [] # list of (peerid, bucket_num, remotebucket)
+
+ d = defer.maybeDeferred(self._check_next_peer)
+ d.addCallback(self._got_all_peers)
+ return d
+
+ def _check_next_peer(self):
+ if len(self.permuted) == 0:
+ # there are no more to check
+ raise NotEnoughPeersError
+ peerid = self.permuted.pop(0)
+
+ d = self._peer.get_remote_service(peerid, "storageserver")
+ def _got_peer(service):
+ bucket_num = len(self.landlords)
+ if self.debug: print "asking %s" % idlib.b2a(peerid)
+ d2 = service.callRemote("get_buckets", verifierid=self._verifierid)
+ def _got_response(buckets):
+ if buckets:
+ bucket_nums = [num for (num,bucket) in buckets]
+ if self.debug:
+ print " peerid %s has buckets %s" % (idlib.b2a(peerid),
+ bucket_nums)
+
+ self.landlords.append( (peerid, buckets) )
+ if len(self.landlords) >= self._desired_shares:
+ if self.debug: print " we're done!"
+ raise HaveAllPeersError
+ # otherwise we fall through to search more peers
+ d2.addCallback(_got_response)
+ return d2
+ d.addCallback(_got_peer)
+
+ def _done_with_peer(res):
+ if self.debug: print "done with peer %s:" % idlib.b2a(peerid)
+ if isinstance(res, failure.Failure):
+ if res.check(HaveAllPeersError):
+ if self.debug: print " all done"
+ # we're done!
+ return
+ if res.check(IndexError):
+ if self.debug: print " no connection"
+ else:
+ if self.debug: print " other error:", res
+ else:
+ if self.debug: print " they had data for us"
+ # we get here for either good peers (when we still need more), or
+ # after checking a bad peer (and thus still need more). So now we
+ # need to grab a new peer.
+ return self._check_next_peer()
+ d.addBoth(_done_with_peer)
+ return d
+
+ def _got_all_peers(self, res):
+ all_buckets = []
+ for peerid, buckets in self.landlords:
+ all_buckets.extend(buckets)
+ d = self._decoder.start(all_buckets)
+ return d
+
+def netstring(s):
+ return "%d:%s," % (len(s), s)
+
+class Downloader(service.MultiService):
+ """I am a service that allows file downloading.
+ """
+ name = "downloader"
+
+ def download_to_filename(self, verifierid, filename):
+ f = open(filename, "wb")
+ def _done(res):
+ f.close()
+ return res
+ d = self.download_filehandle(verifierid, f)
+ d.addBoth(_done)
+ return d
+
+ def download_to_data(self, verifierid):
+ f = StringIO()
+ d = self.download_filehandle(verifierid, f)
+ def _done(res):
+ return f.getvalue()
+ d.addCallback(_done)
+ return d
+
+ def download_filehandle(self, verifierid, f):
+ assert self.parent
+ assert self.running
+ assert isinstance(verifierid, str)
+ assert f.write
+ assert f.close
+ dl = FileDownloader(self.parent, verifierid)
+ dl.set_filehandle(f)
+ dl.make_decoder()
+ d = dl.start()
+ return d
+
+
from twisted.internet import defer
+import sha
+from allmydata.util import idlib
+
+def netstring(s):
+ return "%d:%s," % (len(s), s)
class Encoder(object):
def __init__(self, infile, m):
dl.append(remotebucket.callRemote('close'))
return defer.DeferredList(dl)
+
+class Decoder(object):
+ def __init__(self, outfile, k, m, verifierid):
+ self.outfile = outfile
+ self.k = 2
+ self.m = m
+ self._verifierid = verifierid
+
+ def start(self, buckets):
+ assert len(buckets) >= self.k
+ dl = []
+ for bucketnum, bucket in buckets[:self.k]:
+ d = bucket.callRemote("read")
+ dl.append(d)
+ d2 = defer.DeferredList(dl)
+ d2.addCallback(self._got_all_data)
+ return d2
+
+ def _got_all_data(self, resultslist):
+ shares = [results for success,results in resultslist if success]
+ assert len(shares) >= self.k
+ # here's where the Reed-Solomon magic takes place
+ self.outfile.write(shares[0])
+ hasher = sha.new(netstring("allmydata_v1_verifierid"))
+ hasher.update(shares[0])
+ vid = hasher.digest()
+ if self._verifierid:
+ assert self._verifierid == vid, "%s != %s" % (idlib.b2a(self._verifierid), idlib.b2a(vid))
+
def allocate_bucket(verifierid=Verifierid, bucket_num=int, size=int,
leaser=Nodeid):
return RIBucketWriter_
- def get_bucket(verifierid=Verifierid):
- return RIBucketReader_
+ def get_buckets(verifierid=Verifierid):
+ return ListOf(TupleOf(int, RIBucketReader_))
class RIBucketWriter(RemoteInterface):
def write(data=ShareData):
idlib.b2a(leaser))
return lease
- def remote_get_bucket(self, verifierid):
- return self._bucketstore.get_bucket(verifierid)
+ def remote_get_buckets(self, verifierid):
+ return self._bucketstore.get_buckets(verifierid)
rssd.addCallback(get_node_again)
rssd.addCallback(get_storageserver)
- def get_bucket(storageserver):
- return storageserver.callRemote('get_bucket', verifierid=vid)
- rssd.addCallback(get_bucket)
+ def get_buckets(storageserver):
+ return storageserver.callRemote('get_buckets', verifierid=vid)
+ rssd.addCallback(get_buckets)
+
+ def read_buckets(buckets):
+ self.failUnlessEqual(len(buckets), 1)
+ bucket_num, bucket = buckets[0]
+ self.failUnlessEqual(bucket_num, bnum)
- def read_bucket(bucket):
def check_data(bytes_read):
self.failUnlessEqual(bytes_read, data)
d = bucket.callRemote('read')
d.addCallback(check_data)
-
- def get_bucket_num(junk):
- return bucket.callRemote('get_bucket_num')
- d.addCallback(get_bucket_num)
- def check_bucket_num(bucket_num):
- self.failUnlessEqual(bucket_num, bnum)
- d.addCallback(check_bucket_num)
return d
- rssd.addCallback(read_bucket)
+ rssd.addCallback(read_buckets)
return rssd
import os
from foolscap.eventual import flushEventualQueue
from twisted.python import log
+from allmydata.util import idlib
class SystemTest(unittest.TestCase):
def setUp(self):
return d
test_connections.timeout = 20
- def test_upload(self):
+ def test_upload_and_download(self):
+ DATA = "Some data to upload\n"
d = self.set_up_nodes()
- def _upload(res):
+ def _do_upload(res):
log.msg("UPLOADING")
u = self.clients[0].getServiceNamed("uploader")
- d1 = u.upload_data("Some data to upload\n")
+ d1 = u.upload_data(DATA)
return d1
- d.addCallback(_upload)
- def _done(res):
- log.msg("DONE")
- print "upload finished"
- d.addCallback(_done)
+ d.addCallback(_do_upload)
+ def _upload_done(verifierid):
+ log.msg("upload finished: verifierid=%s" % idlib.b2a(verifierid))
+ dl = self.clients[1].getServiceNamed("downloader")
+ d1 = dl.download_to_data(verifierid)
+ return d1
+ d.addCallback(_upload_done)
+ def _download_done(data):
+ log.msg("download finished")
+ self.failUnlessEqual(data, DATA)
+ d.addCallback(_download_done)
return d
- test_upload.timeout = 20
+ test_upload_and_download.timeout = 20
def _got_all_peers(self, res):
d = self._encoder.do_upload(self.landlords)
+ d.addCallback(lambda res: self._verifierid)
return d
def netstring(s):