From: Zooko O'Whielacronx Date: Fri, 30 Mar 2007 03:19:52 +0000 (-0700) Subject: new upload and storage server X-Git-Url: https://git.rkrishnan.org/module-simplejson.encoder.html?a=commitdiff_plain;h=17299fc96e6fb855c3115497d2ce926e4ee6e02b;p=tahoe-lafs%2Ftahoe-lafs.git new upload and storage server --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 72d96b24..6e0f9b60 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -7,6 +7,7 @@ from allmydata import node from twisted.internet import defer +from allmydata.Crypto.Util.number import bytes_to_long from allmydata.storageserver import StorageServer from allmydata.upload import Uploader from allmydata.download import Downloader @@ -101,16 +102,14 @@ class Client(node.Node, Referenceable): def get_all_peerids(self): return self.introducer_client.connections.iterkeys() - def permute_peerids(self, key, max_count=None): - # TODO: eventually reduce memory consumption by doing an insertion - # sort of at most max_count elements + def get_permuted_peers(self, key): + """ + @return: list of (permuted-peerid, peerid, connection,) + """ results = [] - for nodeid in self.get_all_peerids(): - assert isinstance(nodeid, str) - permuted = sha.new(key + nodeid).digest() - results.append((permuted, nodeid)) + for peerid, connection in self.introducer_client.connections.iteritems(): + assert isinstance(peerid, str) + permuted = bytes_to_long(sha.new(key + peerid).digest()) + results.append((permuted, peerid, connection)) results.sort() - results = [r[1] for r in results] - if max_count is None: - return results - return results[:max_count] + return results diff --git a/src/allmydata/download.py b/src/allmydata/download.py index 0810c962..f08f3a28 100644 --- a/src/allmydata/download.py +++ b/src/allmydata/download.py @@ -48,7 +48,7 @@ class FileDownloader: # footprint max_peers = None - self.permuted = self._peer.permute_peerids(self._verifierid, max_peers) + self.permuted = self._peer.get_permuted_connections(self._verifierid, max_peers) for p in self.permuted: assert isinstance(p, str) self.landlords = [] # list of (peerid, bucket_num, remotebucket) diff --git a/src/allmydata/encode_new.py b/src/allmydata/encode_new.py index 5b8a784c..13c25814 100644 --- a/src/allmydata/encode_new.py +++ b/src/allmydata/encode_new.py @@ -32,29 +32,29 @@ number of segments or log(number of segments)). Each segment (A,B,C) is read into memory, encrypted, and encoded into -subshares. The 'share' (say, share #1) that makes it out to a host is a -collection of these subshares (subshare A1, B1, C1), plus some hash-tree +blocks. The 'share' (say, share #1) that makes it out to a host is a +collection of these blocks (block A1, B1, C1), plus some hash-tree information necessary to validate the data upon retrieval. Only one segment -is handled at a time: all subshares for segment A are delivered before any +is handled at a time: all blocks for segment A are delivered before any work is begun on segment B. -As subshares are created, we retain the hash of each one. The list of -subshare hashes for a single share (say, hash(A1), hash(B1), hash(C1)) is +As blocks are created, we retain the hash of each one. The list of +block hashes for a single share (say, hash(A1), hash(B1), hash(C1)) is used to form the base of a Merkle hash tree for that share (hashtrees[1]). -This hash tree has one terminal leaf per subshare. The complete subshare hash +This hash tree has one terminal leaf per block. The complete block hash tree is sent to the shareholder after all the data has been sent. At retrieval time, the decoder will ask for specific pieces of this tree before -asking for subshares, whichever it needs to validate those subshares. +asking for blocks, whichever it needs to validate those blocks. -(Note: we don't really need to generate this whole subshare hash tree +(Note: we don't really need to generate this whole block hash tree ourselves. It would be sufficient to have the shareholder generate it and just tell us the root. This gives us an extra level of validation on the transfer, though, and it is relatively cheap to compute.) -Each of these subshare hash trees has a root hash. The collection of these +Each of these block hash trees has a root hash. The collection of these root hashes for all shares are collected into the 'share hash tree', which -has one terminal leaf per share. After sending the subshares and the complete -subshare hash tree to each shareholder, we send them the portion of the share +has one terminal leaf per share. After sending the blocks and the complete +block hash tree to each shareholder, we send them the portion of the share hash tree that is necessary to validate their share. The root of the share hash tree is put into the URI. @@ -197,7 +197,7 @@ class Encoder(object): if False: block = "".join(all_hashes) return self.send(shareid, "write", block, offset=0) - return self.send(shareid, "put_subshare_hashes", all_hashes) + return self.send(shareid, "put_block_hashes", all_hashes) def send_all_share_hash_trees(self): dl = [] @@ -229,27 +229,3 @@ class Encoder(object): def done(self): return self.root_hash - - -from foolscap import RemoteInterface -from foolscap.schema import ListOf, TupleOf - - -class RIStorageBucketWriter(RemoteInterface): - def put_subshare(segment_number=int, subshare=str): - return None - def put_segment_hashes(all_hashes=ListOf(str)): - return None - def put_share_hashes(needed_hashes=ListOf(TupleOf(int,str))): - return None - #def write(data=str, offset=int): - # return None -class RIStorageBucketReader(RemoteInterface): - def get_share_hashes(): - return ListOf(TupleOf(int,str)) - def get_segment_hashes(which=ListOf(int)): - return ListOf(str) - def get_subshare(segment_number=int): - return str - #def read(size=int, offset=int): - # return str diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 30d1257f..1cd60a7e 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -3,6 +3,9 @@ from zope.interface import Interface from foolscap.schema import StringConstraint, ListOf, TupleOf, Any from foolscap import RemoteInterface +HASH_SIZE=32 + +Hash = StringConstraint(HASH_SIZE) # binary format 32-byte SHA256 hash Nodeid = StringConstraint(20) # binary format 20-byte SHA1 hash PBURL = StringConstraint(150) Verifierid = StringConstraint(20) @@ -33,29 +36,39 @@ class RIClient(RemoteInterface): def get_nodeid(): return Nodeid -class RIStorageServer(RemoteInterface): - def allocate_bucket(verifierid=Verifierid, bucket_num=int, size=int, - leaser=Nodeid, canary=Referenceable_): - # if the canary is lost before close(), the bucket is deleted - return RIBucketWriter_ - def get_buckets(verifierid=Verifierid): - return ListOf(TupleOf(int, RIBucketReader_)) - class RIBucketWriter(RemoteInterface): - def write(data=ShareData): + def put_block(segmentnum=int, data=ShareData): + return None + + def put_block_hashes(blockhashes=ListOf(Hash)): return None - def set_metadata(metadata=str): + + def put_share_hashes(sharehashes=ListOf(TupleOf(int, Hash))): return None + def close(): + """ + If the data that has been written is incomplete or inconsistent then + the server will throw the data away, else it will store it for future + retrieval. + """ return None +class RIStorageServer(RemoteInterface): + def allocate_buckets(verifierid=Verifierid, sharenums=SetOf(int), + sharesize=int, blocksize=int, canary=Referenceable_): + # if the canary is lost before close(), the bucket is deleted + return TupleOf(SetOf(int), DictOf(int, RIBucketWriter)) + def get_buckets(verifierid=Verifierid): + return DictOf(int, RIBucketReader_) class RIBucketReader(RemoteInterface): - def read(): + def get_block(blocknum=int): return ShareData - def get_metadata(): - return str - + def get_block_hashes(): + return ListOf(Hash)) + def get_share_hashes(): + return ListOf(TupleOf(int, Hash)) class RIMutableDirectoryNode(RemoteInterface): def list(): @@ -291,12 +304,12 @@ class ICodecDecoder(Interface): """ class IEncoder(Interface): - """I take a sequence of plaintext bytes and a list of shareholders, then - encrypt, encode, hash, and deliver shares to those shareholders. I will - compute all the necessary Merkle hash trees that are necessary to - validate the data that eventually comes back from the shareholders. I - provide the root hash of the hash tree, and the encoding parameters, both - of which must be included in the URI. + """I take a file-like object that provides a sequence of bytes and a list + of shareholders, then encrypt, encode, hash, and deliver shares to those + shareholders. I will compute all the necessary Merkle hash trees that are + necessary to validate the data that eventually comes back from the + shareholders. I provide the root hash of the hash tree, and the encoding + parameters, both of which must be included in the URI. I do not choose shareholders, that is left to the IUploader. I must be given a dict of RemoteReferences to storage buckets that are ready and @@ -408,7 +421,6 @@ class IUploader(Interface): def upload_ssk(write_capability, new_version, uploadable): pass # TODO - def upload_data(data): """Like upload(), but accepts a string.""" diff --git a/src/allmydata/storageserver.py b/src/allmydata/storageserver.py index 868dc99a..4286accb 100644 --- a/src/allmydata/storageserver.py +++ b/src/allmydata/storageserver.py @@ -3,32 +3,113 @@ import os from foolscap import Referenceable from twisted.application import service -from allmydata.bucketstore import BucketStore from zope.interface import implements -from allmydata.interfaces import RIStorageServer -from allmydata.util import idlib +from allmydata.interfaces import RIStorageServer, RIBucketWriter +from allmydata import interfaces +from allmydata.util import bencode, fileutil, idlib +from allmydata.util.assertutil import _assert, precondition -class BucketAlreadyExistsError(Exception): - pass +# store/ +# store/tmp # temp dirs named $VERIFIERID/$SHARENUM that will be moved to store/ on success +# store/$VERIFIERID +# store/$VERIFIERID/$SHARENUM +# store/$VERIFIERID/$SHARENUM/blocksize +# store/$VERIFIERID/$SHARENUM/data +# store/$VERIFIERID/$SHARENUM/blockhashes +# store/$VERIFIERID/$SHARENUM/sharehashtree +# $SHARENUM matches this regex: +NUM_RE=re.compile("[1-9][0-9]*") + +class BucketWriter(Referenceable): + implements(RIBucketWriter) + + def __init__(self, tmphome, finalhome, blocksize): + self.tmphome = tmphome + self.finalhome = finalhome + self.blocksize = blocksize + self.closed = False + self._write_file('blocksize', str(blocksize)) + + def _write_file(self, fname, data): + open(os.path.join(tmphome, fname), 'wb').write(data) + + def remote_put_block(self, segmentnum, data): + precondition(not self.closed) + assert len(data) == self.blocksize + f = open(os.path.join(self.tmphome, 'data'), 'wb') + f.seek(self.blocksize*segmentnum) + f.write(data) + + def remote_put_block_hashes(self, blockhashes): + precondition(not self.closed) + # TODO: verify the length of blockhashes. + # TODO: tighten foolscap schema to require exactly 32 bytes. + self._write_file('blockhashes', ''.join(blockhashes)) + + def remote_put_share_hashes(self, sharehashes): + precondition(not self.closed) + self._write_file('sharehashree', bencode.bencode(sharehashes)) + + def close(self): + precondition(not self.closed) + # TODO assert or check the completeness and consistency of the data that has been written + fileutil.rename(self.tmphome, self.finalhome) + self.closed = True + +def str2l(s): + """ split string (pulled from storage) into a list of blockids """ + return [ s[i:i+interfaces.HASH_SIZE] for i in range(0, len(s), interfaces.HASH_SIZE) ] + +class BucketReader(Referenceable): + def __init__(self, home): + self.home = home + self.blocksize = int(self._read_file('blocksize')) + + def _read_file(self, fname): + return open(os.path.join(self.home, fname), 'rb').read() + + def remote_get_block(self, blocknum): + f = open(os.path.join(self.home, 'data'), 'rb') + f.seek(self.blocksize * blocknum) + return f.read(self.blocksize) + + def remote_get_block_hashes(self): + return str2l(self._read_file('blockhashes')) + + def remote_get_share_hashes(self): + return bencode.bdecode(self._read_file('sharehashes')) + class StorageServer(service.MultiService, Referenceable): implements(RIStorageServer) name = 'storageserver' - def __init__(self, store_dir): - if not os.path.isdir(store_dir): - os.mkdir(store_dir) + def __init__(self, storedir): + fileutil.make_dirs(storedir) + self.storedir = storedir + self.tmpdir = os.path.join(storedir, 'tmp') + self._clean_trash() + fileutil.make_dirs(self.tmpdir) + service.MultiService.__init__(self) - self._bucketstore = BucketStore(store_dir) - self._bucketstore.setServiceParent(self) - def remote_allocate_bucket(self, verifierid, bucket_num, size, leaser, - canary): - if self._bucketstore.has_bucket(verifierid): - raise BucketAlreadyExistsError() - lease = self._bucketstore.allocate_bucket(verifierid, bucket_num, size, - idlib.b2a(leaser), canary) - return lease + def _clean_trash(self): + fileutil.rm_dir(self.tmpdir) + + def remote_allocate_buckets(self, verifierid, sharenums, sharesize, + blocksize, canary): + bucketwriters = {} # k: sharenum, v: BucketWriter + for sharenum in sharenums: + tmphome = os.path.join(self.tmpdir, idlib.a2b(verifierid), "%d"%sharenum) + finalhome = os.path.join(self.storedir, idlib.a2b(verifierid), "%d"%sharenum) + bucketwriters[sharenum] = BucketWriter(tmphome, finalhome, blocksize) + + return bucketwriters def remote_get_buckets(self, verifierid): - return self._bucketstore.get_buckets(verifierid) + bucketreaders = {} # k: sharenum, v: BucketReader + verifierdir = os.path.join(self.storedir, idlib.b2a(verifierid)) + for f in os.listdir(verifierdir): + _assert(NUM_RE.match(f)) + bucketreaders[int(f)] = BucketReader(os.path.join(verifierdir, f)) + return bucketreaders diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index a0ce5f2a..b9bbba24 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -2,15 +2,14 @@ import os from twisted.trial import unittest -from allmydata import client +from allmydata import client, introducer -class MyClient(client.Client): - def __init__(self, basedir): +class MyIntroducerClient(introducer.IntroducerClient): + def __init__(self): self.connections = {} - client.Client.__init__(self, basedir) - def get_all_peerids(self): - return self.connections +def permute(c, key): + return [ y for x, y, z in c.get_permuted_peers(key) ] class Basic(unittest.TestCase): def test_loadable(self): @@ -25,17 +24,18 @@ class Basic(unittest.TestCase): os.mkdir(basedir) open(os.path.join(basedir, "introducer.furl"), "w").write("") open(os.path.join(basedir, "vdrive.furl"), "w").write("") - c = MyClient(basedir) + c = client.Client(basedir) + c.introducer_client = MyIntroducerClient() for k in ["%d" % i for i in range(5)]: - c.connections[k] = None - self.failUnlessEqual(c.permute_peerids("one"), ['3','1','0','4','2']) - self.failUnlessEqual(c.permute_peerids("one", 3), ['3','1','0']) - self.failUnlessEqual(c.permute_peerids("two"), ['0','4','2','1','3']) - c.connections.clear() - self.failUnlessEqual(c.permute_peerids("one"), []) - - c2 = MyClient(basedir) + c.introducer_client.connections[k] = None + self.failUnlessEqual(permute(c, "one"), ['3','1','0','4','2']) + self.failUnlessEqual(permute(c, "two"), ['0','4','2','1','3']) + c.introducer_client.connections.clear() + self.failUnlessEqual(permute(c, "one"), []) + + c2 = client.Client(basedir) + c2.introducer_client = MyIntroducerClient() for k in ["%d" % i for i in range(5)]: - c2.connections[k] = None - self.failUnlessEqual(c2.permute_peerids("one"), ['3','1','0','4','2']) + c2.introducer_client.connections[k] = None + self.failUnlessEqual(permute(c2, "one"), ['3','1','0','4','2']) diff --git a/src/allmydata/test/test_ring.py b/src/allmydata/test/test_ring.py new file mode 100644 index 00000000..59737f2b --- /dev/null +++ b/src/allmydata/test/test_ring.py @@ -0,0 +1,19 @@ +#! /usr/bin/python + +from twisted.trial import unittest + +from allmydata.util import ring + +class Ring(unittest.TestCase): + def test_1(self): + self.failUnlessEquals(ring.distance(8, 9), 1) + self.failUnlessEquals(ring.distance(9, 8), 2**160-1) + self.failUnlessEquals(ring.distance(2, 2**160-1), 2**160-3) + self.failUnlessEquals(ring.distance(2**160-1, 2), 3) + self.failUnlessEquals(ring.distance(0, 2**159), 2**159) + self.failUnlessEquals(ring.distance(2**159, 0), 2**159) + self.failUnlessEquals(ring.distance(2**159-1, 2**159+1), 2) + self.failUnlessEquals(ring.distance(2**159-1, 1), 2**159+2) + self.failUnlessEquals(ring.distance(2**159-1, 2**159-1), 0) + self.failUnlessEquals(ring.distance(0, 0), 0) + diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index a17c3712..686f23b0 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -55,8 +55,7 @@ class FakeClient: else: self.peers.append(FakePeer(str(peerid), r)) - def permute_peerids(self, key, max_peers): - assert max_peers == None + def get_permuted_connections(self, key): return [str(i) for i in range(len(self.peers))] def get_remote_service(self, peerid, name): @@ -202,13 +201,12 @@ class FakePeer2: class FakeClient2: nodeid = "fakeclient" - def __init__(self, max_peers): + def __init__(self, num_peers): self.peers = [] - for peerid in range(max_peers): + for peerid in range(num_peers): self.peers.append(FakePeer2(str(peerid))) - def permute_peerids(self, key, max_peers): - assert max_peers == None + def get_permuted_connections(self, key): return [str(i) for i in range(len(self.peers))] def get_remote_service(self, peerid, name): diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 0fdf309d..c994c9c0 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -1,19 +1,16 @@ - from zope.interface import implements -from twisted.python import failure, log +from twisted.python import log from twisted.internet import defer from twisted.application import service from foolscap import Referenceable -from allmydata.util import idlib, bencode, mathutil -from allmydata.util.idlib import peerid_to_short_string as shortid -from allmydata.util.deferredutil import DeferredListShouldSucceed +from allmydata.util import idlib, mathutil from allmydata import codec from allmydata.uri import pack_uri from allmydata.interfaces import IUploadable, IUploader from cStringIO import StringIO -import sha +import collections, random, sha class NotEnoughPeersError(Exception): pass @@ -26,19 +23,38 @@ class HaveAllPeersError(Exception): class TooFullError(Exception): pass +class PeerTracker: + def __init__(self, peerid, connection, sharesize, blocksize, verifierid): + self.peerid = peerid + self.connection = connection + self.buckets = {} # k: shareid, v: IRemoteBucketWriter + self.sharesize = sharesize + self.blocksize = blocksize + self.verifierid = verifierid + + def query(self, sharenums): + d = self.connection.callRemote("allocate_buckets", self._verifierid, + sharenums, self.sharesize, + self.blocksize, canary=Referenceable()) + d.addCallback(self._got_reply) + return d + + def _got_reply(self, (alreadygot, buckets)): + self.buckets.update(buckets) + return (alreadygot, set(buckets.keys())) class FileUploader: debug = False ENCODERCLASS = codec.CRSEncoder - def __init__(self, peer): - self._peer = peer + def __init__(self, client): + self._client = client - def set_params(self, min_shares, target_goodness, max_shares): - self.min_shares = min_shares - self.target_goodness = target_goodness - self.max_shares = max_shares + def set_params(self, needed_shares, shares_of_happiness, total_shares): + self.needed_shares = needed_shares + self.shares_of_happiness = shares_of_happiness + self.total_shares = total_shares def set_filehandle(self, filehandle): self._filehandle = filehandle @@ -64,195 +80,120 @@ class FileUploader: log.msg("starting upload [%s]" % (idlib.b2a(self._verifierid),)) if self.debug: print "starting upload" - assert self.min_shares - assert self.target_goodness + assert self.needed_shares # create the encoder, so we can know how large the shares will be - total_shares = self.max_shares - needed_shares = self.min_shares self._encoder = self.ENCODERCLASS() self._codec_name = self._encoder.get_encoder_type() - self._needed_shares = needed_shares - paddedsize = self._size + mathutil.pad_size(self._size, needed_shares) - self._encoder.set_params(paddedsize, needed_shares, total_shares) + paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares) + self._encoder.set_params(paddedsize, self.needed_shares, self.total_shares) self._share_size = self._encoder.get_share_size() # first step: who should we upload to? - - # We will talk to at most max_peers (which can be None to mean no - # limit). Maybe limit max_peers to 2*len(self.shares), to reduce - # memory footprint. For now, make it unlimited. - max_peers = None - - self.permuted = self._peer.permute_peerids(self._verifierid, max_peers) - self.peers_who_said_yes = [] - self.peers_who_said_no = [] - self.peers_who_had_errors = [] - - self._total_peers = len(self.permuted) - for p in self.permuted: - assert isinstance(p, str) - # we will shrink self.permuted as we give up on peers - - d = defer.maybeDeferred(self._find_peers) - d.addCallback(self._got_enough_peers) + peers = self._client.get_permuted_peers(self._verifierid) + assert peers + trackers = [ (permutedid, PeerTracker(peerid, conn),) + for permutedid, peerid, conn in peers ] + ring_things = [] # a list of (position_in_ring, whatami, x) where whatami is 0 if x is a sharenum or else 1 if x is a PeerTracker instance + ring_things.extend([ (permutedpeerid, 1, peer,) for permutedpeerid, peer in trackers ]) + shares = [ (i * 2**160 / self.total_shares, 0, i) for i in range(self.total_shares) ] + ring_things.extend(shares) + ring_things.sort() + self.ring_things = collections.deque(ring_things) + self.usable_peers = set([peer for permutedid, peer in trackers]) + self.used_peers = set() + self.unallocated_sharenums = set(shares) + + d = self._locate_all_shareholders() + d.addCallback(self._send_shares) d.addCallback(self._compute_uri) return d - def _compute_uri(self, params): - return pack_uri(self._codec_name, params, self._verifierid) - - def _build_not_enough_peers_error(self): - yes = ",".join([shortid(p) for p in self.peers_who_said_yes]) - no = ",".join([shortid(p) for p in self.peers_who_said_no]) - err = ",".join([shortid(p) for p in self.peers_who_had_errors]) - msg = ("%s goodness, want %s, have %d " - "landlords, %d total peers, " - "peers:yes=%s;no=%s;err=%s" % - (self.goodness_points, self.target_goodness, - len(self.landlords), self._total_peers, - yes, no, err)) - return msg - - def _find_peers(self): - # this returns a Deferred which fires (with a meaningless value) when - # enough peers are found, or errbacks with a NotEnoughPeersError if - # not. - self.peer_index = 0 - self.goodness_points = 0 - self.landlords = [] # list of (peerid, bucket_num, remotebucket) - return self._check_next_peer() - - def _check_next_peer(self): - if self.debug: - log.msg("FileUploader._check_next_peer: %d permuted, %d goodness" - " (want %d), have %d landlords, %d total peers" % - (len(self.permuted), self.goodness_points, - self.target_goodness, len(self.landlords), - self._total_peers)) - if (self.goodness_points >= self.target_goodness and - len(self.landlords) >= self.min_shares): - if self.debug: print " we're done!" - return "done" - if not self.permuted: - # we've run out of peers to check without finding enough, which - # means we won't be able to upload this file. Bummer. - msg = self._build_not_enough_peers_error() - log.msg("NotEnoughPeersError: %s" % msg) - raise NotEnoughPeersError(msg) - - # otherwise we use self.peer_index to rotate through all the usable - # peers. It gets inremented elsewhere, but wrapped here. - if self.peer_index >= len(self.permuted): - self.peer_index = 0 - - peerid = self.permuted[self.peer_index] - - d = self._check_peer(peerid) - d.addCallback(lambda res: self._check_next_peer()) - return d - - def _check_peer(self, peerid): - # contact a single peer, and ask them to hold a share. If they say - # yes, we update self.landlords and self.goodness_points, and - # increment self.peer_index. If they say no, or are uncontactable, we - # remove them from self.permuted. This returns a Deferred which never - # errbacks. - - bucket_num = len(self.landlords) - d = self._peer.get_remote_service(peerid, "storageserver") - def _got_peer(service): - if self.debug: print "asking %s" % shortid(peerid) - d2 = service.callRemote("allocate_bucket", - verifierid=self._verifierid, - bucket_num=bucket_num, - size=self._share_size, - leaser=self._peer.nodeid, - canary=Referenceable()) - return d2 - d.addCallback(_got_peer) - - def _allocate_response(bucket): - if self.debug: - print " peerid %s will grant us a lease" % shortid(peerid) - self.peers_who_said_yes.append(peerid) - self.landlords.append( (peerid, bucket_num, bucket) ) - self.goodness_points += 1 - self.peer_index += 1 - - d.addCallback(_allocate_response) - - def _err(f): - if self.debug: print "err from peer %s:" % idlib.b2a(peerid) - assert isinstance(f, failure.Failure) - if f.check(TooFullError): - if self.debug: print " too full" - self.peers_who_said_no.append(peerid) - elif f.check(IndexError): - if self.debug: print " no connection" - self.peers_who_had_errors.append(peerid) - else: - if self.debug: print " other error:", f - self.peers_who_had_errors.append(peerid) - log.msg("FileUploader._check_peer(%s): err" % shortid(peerid)) - log.msg(f) - self.permuted.remove(peerid) # this peer was unusable - return None - d.addErrback(_err) - return d - - def _got_enough_peers(self, res): - landlords = self.landlords - if self.debug: - log.msg("FileUploader._got_enough_peers") - log.msg(" %d landlords" % len(landlords)) - if len(landlords) < 20: - log.msg(" peerids: %s" % " ".join([idlib.b2a(l[0]) - for l in landlords])) - log.msg(" buckets: %s" % " ".join([str(l[1]) - for l in landlords])) - # assign shares to landlords - self.sharemap = {} - for peerid, bucket_num, bucket in landlords: - self.sharemap[bucket_num] = bucket - # the sharemap should have exactly len(landlords) shares, with - # no holes - assert sorted(self.sharemap.keys()) == range(len(landlords)) - # encode all the data at once: this class does not use segmentation - data = self._filehandle.read() - - # xyz i am about to go away anyway. - chunksize = mathutil.div_ceil(len(data), self._needed_shares) - numchunks = mathutil.div_ceil(len(data), chunksize) - l = [ data[i:i+chunksize] for i in range(0, len(data), chunksize) ] - # padding - if len(l[-1]) != len(l[0]): - l[-1] = l[-1] + ('\x00'*(len(l[0])-len(l[-1]))) - d = self._encoder.encode(l, self.sharemap.keys()) - d.addCallback(self._send_all_shares) - d.addCallback(lambda res: self._encoder.get_serialized_params()) + def _locate_all_shareholders(self): + """ + @return: a set of PeerTracker instances that have agreed to hold some + shares for us + """ + d = self._query_peers() + def _done(res): + if not self.unallocated_sharenums: + return self._used_peers + if not self.usable_peers: + if len(self.unallocated_sharenums) < (self.total_shares - self.shares_of_happiness): + # close enough + return self._used_peers + raise NotEnoughPeersError + return self._query_peers() + d.addCallback(_done) return d - def _send_one_share(self, bucket, sharedata, metadata): - d = bucket.callRemote("write", sharedata) - d.addCallback(lambda res: - bucket.callRemote("set_metadata", metadata)) - d.addCallback(lambda res: - bucket.callRemote("close")) - return d + def _query_peers(self): + """ + @return: a deferred that fires when all queries have resolved + """ + # Choose a random starting point, talk to that peer. + self.ring_things.rotate(random.randrange(0, len(self.ring_things))) + + # Walk backwards to find a peer. We know that we'll eventually find + # one because we earlier asserted that there was at least one. + while self.ring_things[0][1] != 1: + self.ring_things.rotate(-1) + startingpoint = self.ring_things[0] + peer = startingpoint[2] + assert isinstance(peer, PeerTracker), peer + self.ring_things.rotate(-1) + + # loop invariant: at the top of the loop, we are always one step to + # the left of a peer, which is stored in the peer variable. + outstanding_queries = [] + while self.ring_things[0] != startingpoint: + # Walk backwards to find the previous peer (could be the same one). + # Accumulate all shares that we find along the way. + sharenums_to_query = set() + while self.ring_things[0][1] != 1: + sharenums_to_query.add(self.ring_things[0][2]) + self.ring_things.rotate(-1) + + d = peer.query(sharenums_to_query) + d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,)) + outstanding_queries.append(d) + + peer = self.ring_things[0][2] + assert isinstance(peer, PeerTracker), peer + self.ring_things.rotate(-1) + + return defer.DeferredList(outstanding_queries) + + def _got_response(self, (alreadygot, allocated), peer, shares_we_requested): + """ + @type alreadygot: a set of sharenums + @type allocated: a set of sharenums + """ + self.unallocated_sharenums -= alreadygot + self.unallocated_sharenums -= allocated + + if allocated: + self.used_peers.add(peer) + + if shares_we_requested - alreadygot - allocated: + # Then he didn't accept some of the shares, so he's full. + self.usable_peers.remove(peer) + + def _got_error(self, f, peer): + self.usable_peers -= peer + + def _send_shares(self, used_peers): + buckets = {} + for peer in used_peers: + buckets.update(peer.buckets) + assert len(buckets) == sum([len(peer.buckets) for peer in used_peers]) + self._encoder.set_shareholders(buckets) + return self._encoder.start() + + def _compute_uri(self, roothash): + params = self._encoder.get_serialized_params() + return pack_uri(self._codec_name, params, self._verifierid, roothash) - def _send_all_shares(self, (shares, shareids)): - dl = [] - for (shareid, share) in zip(shareids, shares): - if self.debug: - log.msg(" writing share %d" % shareid) - metadata = bencode.bencode(shareid) - assert len(share) == self._share_size - assert isinstance(share, str) - bucket = self.sharemap[shareid] - d = self._send_one_share(bucket, share, metadata) - dl.append(d) - return DeferredListShouldSucceed(dl) def netstring(s): return "%d:%s," % (len(s), s) @@ -293,6 +234,10 @@ class Uploader(service.MultiService): uploader_class = FileUploader debug = False + needed_shares = 25 # Number of shares required to reconstruct a file. + desired_shares = 75 # We will abort an upload unless we can allocate space for at least this many. + total_shares = 100 # Total number of shares created by encoding. If everybody has room then this is is how many we will upload. + def _compute_verifierid(self, f): hasher = sha.new(netstring("allmydata_v1_verifierid")) f.seek(0) @@ -314,7 +259,7 @@ class Uploader(service.MultiService): u.set_filehandle(fh) # push two shares, require that we get two back. TODO: this is # temporary, of course. - u.set_params(2, 2, 4) + u.set_params(self.needed_shares, self.desired_shares, self.total_shares) u.set_verifierid(self._compute_verifierid(fh)) d = u.start() def _done(res): diff --git a/src/allmydata/util/ring.py b/src/allmydata/util/ring.py new file mode 100644 index 00000000..0ffebecd --- /dev/null +++ b/src/allmydata/util/ring.py @@ -0,0 +1,13 @@ + +def distance(p1, p2, FULL = 2**160, HALF = 2**159): + """ + Distance between two points in the space, expressed as longs. + + @param p1: long of first point + @param p2: long of second point + """ + d = p2 - p1 + if d < 0: + d = FULL + d + return d +