]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
new upload and storage server
authorZooko O'Whielacronx <zooko@zooko.com>
Fri, 30 Mar 2007 03:19:52 +0000 (20:19 -0700)
committerZooko O'Whielacronx <zooko@zooko.com>
Fri, 30 Mar 2007 03:19:52 +0000 (20:19 -0700)
src/allmydata/client.py
src/allmydata/download.py
src/allmydata/encode_new.py
src/allmydata/interfaces.py
src/allmydata/storageserver.py
src/allmydata/test/test_client.py
src/allmydata/test/test_ring.py [new file with mode: 0644]
src/allmydata/test/test_upload.py
src/allmydata/upload.py
src/allmydata/util/ring.py [new file with mode: 0644]

index 72d96b24e0400d842b00029dce88aa03bb706701..6e0f9b603d55c1a525d8a7f0be96dfeaee951895 100644 (file)
@@ -7,6 +7,7 @@ from allmydata import node
 
 from twisted.internet import defer
 
+from allmydata.Crypto.Util.number import bytes_to_long
 from allmydata.storageserver import StorageServer
 from allmydata.upload import Uploader
 from allmydata.download import Downloader
@@ -101,16 +102,14 @@ class Client(node.Node, Referenceable):
     def get_all_peerids(self):
         return self.introducer_client.connections.iterkeys()
 
-    def permute_peerids(self, key, max_count=None):
-        # TODO: eventually reduce memory consumption by doing an insertion
-        # sort of at most max_count elements
+    def get_permuted_peers(self, key):
+        """
+        @return: list of (permuted-peerid, peerid, connection,)
+        """
         results = []
-        for nodeid in self.get_all_peerids():
-            assert isinstance(nodeid, str)
-            permuted = sha.new(key + nodeid).digest()
-            results.append((permuted, nodeid))
+        for peerid, connection in self.introducer_client.connections.iteritems():
+            assert isinstance(peerid, str)
+            permuted = bytes_to_long(sha.new(key + peerid).digest())
+            results.append((permuted, peerid, connection))
         results.sort()
-        results = [r[1] for r in results]
-        if max_count is None:
-            return results
-        return results[:max_count]
+        return results
index 0810c9627f6041954c6e3e64c02f86d058ebf622..f08f3a2847f84571ca049b5706641a56f637b109 100644 (file)
@@ -48,7 +48,7 @@ class FileDownloader:
         # footprint
         max_peers = None
 
-        self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
+        self.permuted = self._peer.get_permuted_connections(self._verifierid, max_peers)
         for p in self.permuted:
             assert isinstance(p, str)
         self.landlords = [] # list of (peerid, bucket_num, remotebucket)
index 5b8a784cc39229bb73b1ff7ae05899370e61710a..13c25814556bfb6811096f3703647697a7871984 100644 (file)
@@ -32,29 +32,29 @@ number of segments or log(number of segments)).
 
 
 Each segment (A,B,C) is read into memory, encrypted, and encoded into
-subshares. The 'share' (say, share #1) that makes it out to a host is a
-collection of these subshares (subshare A1, B1, C1), plus some hash-tree
+blocks. The 'share' (say, share #1) that makes it out to a host is a
+collection of these blocks (block A1, B1, C1), plus some hash-tree
 information necessary to validate the data upon retrieval. Only one segment
-is handled at a time: all subshares for segment A are delivered before any
+is handled at a time: all blocks for segment A are delivered before any
 work is begun on segment B.
 
-As subshares are created, we retain the hash of each one. The list of
-subshare hashes for a single share (say, hash(A1), hash(B1), hash(C1)) is
+As blocks are created, we retain the hash of each one. The list of
+block hashes for a single share (say, hash(A1), hash(B1), hash(C1)) is
 used to form the base of a Merkle hash tree for that share (hashtrees[1]).
-This hash tree has one terminal leaf per subshare. The complete subshare hash
+This hash tree has one terminal leaf per block. The complete block hash
 tree is sent to the shareholder after all the data has been sent. At
 retrieval time, the decoder will ask for specific pieces of this tree before
-asking for subshares, whichever it needs to validate those subshares.
+asking for blocks, whichever it needs to validate those blocks.
 
-(Note: we don't really need to generate this whole subshare hash tree
+(Note: we don't really need to generate this whole block hash tree
 ourselves. It would be sufficient to have the shareholder generate it and
 just tell us the root. This gives us an extra level of validation on the
 transfer, though, and it is relatively cheap to compute.)
 
-Each of these subshare hash trees has a root hash. The collection of these
+Each of these block hash trees has a root hash. The collection of these
 root hashes for all shares are collected into the 'share hash tree', which
-has one terminal leaf per share. After sending the subshares and the complete
-subshare hash tree to each shareholder, we send them the portion of the share
+has one terminal leaf per share. After sending the blocks and the complete
+block hash tree to each shareholder, we send them the portion of the share
 hash tree that is necessary to validate their share. The root of the share
 hash tree is put into the URI.
 
@@ -197,7 +197,7 @@ class Encoder(object):
         if False:
             block = "".join(all_hashes)
             return self.send(shareid, "write", block, offset=0)
-        return self.send(shareid, "put_subshare_hashes", all_hashes)
+        return self.send(shareid, "put_block_hashes", all_hashes)
 
     def send_all_share_hash_trees(self):
         dl = []
@@ -229,27 +229,3 @@ class Encoder(object):
 
     def done(self):
         return self.root_hash
-
-
-from foolscap import RemoteInterface
-from foolscap.schema import ListOf, TupleOf
-
-
-class RIStorageBucketWriter(RemoteInterface):
-    def put_subshare(segment_number=int, subshare=str):
-        return None
-    def put_segment_hashes(all_hashes=ListOf(str)):
-        return None
-    def put_share_hashes(needed_hashes=ListOf(TupleOf(int,str))):
-        return None
-    #def write(data=str, offset=int):
-    #    return None
-class RIStorageBucketReader(RemoteInterface):
-    def get_share_hashes():
-        return ListOf(TupleOf(int,str))
-    def get_segment_hashes(which=ListOf(int)):
-        return ListOf(str)
-    def get_subshare(segment_number=int):
-        return str
-    #def read(size=int, offset=int):
-    #    return str
index 30d1257f763609aad1ef580062cb63c18fe821ff..1cd60a7e580871313fae40e03c61a7bbc37a344a 100644 (file)
@@ -3,6 +3,9 @@ from zope.interface import Interface
 from foolscap.schema import StringConstraint, ListOf, TupleOf, Any
 from foolscap import RemoteInterface
 
+HASH_SIZE=32
+
+Hash = StringConstraint(HASH_SIZE) # binary format 32-byte SHA256 hash
 Nodeid = StringConstraint(20) # binary format 20-byte SHA1 hash
 PBURL = StringConstraint(150)
 Verifierid = StringConstraint(20)
@@ -33,29 +36,39 @@ class RIClient(RemoteInterface):
     def get_nodeid():
         return Nodeid
 
-class RIStorageServer(RemoteInterface):
-    def allocate_bucket(verifierid=Verifierid, bucket_num=int, size=int,
-                        leaser=Nodeid, canary=Referenceable_):
-        # if the canary is lost before close(), the bucket is deleted
-        return RIBucketWriter_
-    def get_buckets(verifierid=Verifierid):
-        return ListOf(TupleOf(int, RIBucketReader_))
-
 class RIBucketWriter(RemoteInterface):
-    def write(data=ShareData):
+    def put_block(segmentnum=int, data=ShareData):
+        return None
+    
+    def put_block_hashes(blockhashes=ListOf(Hash)):
         return None
-    def set_metadata(metadata=str):
+        
+    def put_share_hashes(sharehashes=ListOf(TupleOf(int, Hash))):
         return None
+
     def close():
+        """
+        If the data that has been written is incomplete or inconsistent then
+        the server will throw the data away, else it will store it for future
+        retrieval.
+        """
         return None
 
+class RIStorageServer(RemoteInterface):
+    def allocate_buckets(verifierid=Verifierid, sharenums=SetOf(int),
+                         sharesize=int, blocksize=int, canary=Referenceable_):
+        # if the canary is lost before close(), the bucket is deleted
+        return TupleOf(SetOf(int), DictOf(int, RIBucketWriter))
+    def get_buckets(verifierid=Verifierid):
+        return DictOf(int, RIBucketReader_)
 
 class RIBucketReader(RemoteInterface):
-    def read():
+    def get_block(blocknum=int):
         return ShareData
-    def get_metadata():
-        return str
-
+    def get_block_hashes():
+        return ListOf(Hash))
+    def get_share_hashes():
+        return ListOf(TupleOf(int, Hash))
 
 class RIMutableDirectoryNode(RemoteInterface):
     def list():
@@ -291,12 +304,12 @@ class ICodecDecoder(Interface):
         """
 
 class IEncoder(Interface):
-    """I take a sequence of plaintext bytes and a list of shareholders, then
-    encrypt, encode, hash, and deliver shares to those shareholders. I will
-    compute all the necessary Merkle hash trees that are necessary to
-    validate the data that eventually comes back from the shareholders. I
-    provide the root hash of the hash tree, and the encoding parameters, both
-    of which must be included in the URI.
+    """I take a file-like object that provides a sequence of bytes and a list
+    of shareholders, then encrypt, encode, hash, and deliver shares to those
+    shareholders. I will compute all the necessary Merkle hash trees that are
+    necessary to validate the data that eventually comes back from the
+    shareholders. I provide the root hash of the hash tree, and the encoding
+    parameters, both of which must be included in the URI.
 
     I do not choose shareholders, that is left to the IUploader. I must be
     given a dict of RemoteReferences to storage buckets that are ready and
@@ -408,7 +421,6 @@ class IUploader(Interface):
 
     def upload_ssk(write_capability, new_version, uploadable):
         pass # TODO
-
     def upload_data(data):
         """Like upload(), but accepts a string."""
 
index 868dc99a667163e32bdffe0bd594334621559907..4286accb241286c7141b1c70a1bfab338b6898c4 100644 (file)
@@ -3,32 +3,113 @@ import os
 from foolscap import Referenceable
 from twisted.application import service
 
-from allmydata.bucketstore import BucketStore
 from zope.interface import implements
-from allmydata.interfaces import RIStorageServer
-from allmydata.util import idlib
+from allmydata.interfaces import RIStorageServer, RIBucketWriter
+from allmydata import interfaces
+from allmydata.util import bencode, fileutil, idlib
+from allmydata.util.assertutil import _assert, precondition
 
-class BucketAlreadyExistsError(Exception):
-    pass
+# store/
+# store/tmp # temp dirs named $VERIFIERID/$SHARENUM that will be moved to store/ on success
+# store/$VERIFIERID
+# store/$VERIFIERID/$SHARENUM
+# store/$VERIFIERID/$SHARENUM/blocksize
+# store/$VERIFIERID/$SHARENUM/data
+# store/$VERIFIERID/$SHARENUM/blockhashes
+# store/$VERIFIERID/$SHARENUM/sharehashtree
 
+# $SHARENUM matches this regex:
+NUM_RE=re.compile("[1-9][0-9]*")
+
+class BucketWriter(Referenceable):
+    implements(RIBucketWriter)
+
+    def __init__(self, tmphome, finalhome, blocksize):
+        self.tmphome = tmphome
+        self.finalhome = finalhome
+        self.blocksize = blocksize
+        self.closed = False
+        self._write_file('blocksize', str(blocksize))
+
+    def _write_file(self, fname, data):
+        open(os.path.join(tmphome, fname), 'wb').write(data)
+
+    def remote_put_block(self, segmentnum, data):
+        precondition(not self.closed)
+        assert len(data) == self.blocksize
+        f = open(os.path.join(self.tmphome, 'data'), 'wb')
+        f.seek(self.blocksize*segmentnum)
+        f.write(data)
+
+    def remote_put_block_hashes(self, blockhashes):
+        precondition(not self.closed)
+        # TODO: verify the length of blockhashes.
+        # TODO: tighten foolscap schema to require exactly 32 bytes.
+        self._write_file('blockhashes', ''.join(blockhashes))
+
+    def remote_put_share_hashes(self, sharehashes):
+        precondition(not self.closed)
+        self._write_file('sharehashree', bencode.bencode(sharehashes))
+
+    def close(self):
+        precondition(not self.closed)
+        # TODO assert or check the completeness and consistency of the data that has been written
+        fileutil.rename(self.tmphome, self.finalhome)
+        self.closed = True
+
+def str2l(s):
+    """ split string (pulled from storage) into a list of blockids """
+    return [ s[i:i+interfaces.HASH_SIZE] for i in range(0, len(s), interfaces.HASH_SIZE) ]
+
+class BucketReader(Referenceable):
+    def __init__(self, home):
+        self.home = home
+        self.blocksize = int(self._read_file('blocksize'))
+
+    def _read_file(self, fname):
+        return open(os.path.join(self.home, fname), 'rb').read()
+
+    def remote_get_block(self, blocknum):
+        f = open(os.path.join(self.home, 'data'), 'rb')
+        f.seek(self.blocksize * blocknum)
+        return f.read(self.blocksize)
+
+    def remote_get_block_hashes(self):
+        return str2l(self._read_file('blockhashes'))
+
+    def remote_get_share_hashes(self):
+        return bencode.bdecode(self._read_file('sharehashes'))
+   
 class StorageServer(service.MultiService, Referenceable):
     implements(RIStorageServer)
     name = 'storageserver'
 
-    def __init__(self, store_dir):
-        if not os.path.isdir(store_dir):
-            os.mkdir(store_dir)
+    def __init__(self, storedir):
+        fileutil.make_dirs(storedir)
+        self.storedir = storedir
+        self.tmpdir = os.path.join(storedir, 'tmp')
+        self._clean_trash()
+        fileutil.make_dirs(self.tmpdir)
+
         service.MultiService.__init__(self)
-        self._bucketstore = BucketStore(store_dir)
-        self._bucketstore.setServiceParent(self)
 
-    def remote_allocate_bucket(self, verifierid, bucket_num, size, leaser,
-                               canary):
-        if self._bucketstore.has_bucket(verifierid):
-            raise BucketAlreadyExistsError()
-        lease = self._bucketstore.allocate_bucket(verifierid, bucket_num, size,
-                                                  idlib.b2a(leaser), canary)
-        return lease
+    def _clean_trash(self):
+        fileutil.rm_dir(self.tmpdir)
+
+    def remote_allocate_buckets(self, verifierid, sharenums, sharesize,
+                                blocksize, canary):
+        bucketwriters = {} # k: sharenum, v: BucketWriter
+        for sharenum in sharenums:
+            tmphome = os.path.join(self.tmpdir, idlib.a2b(verifierid), "%d"%sharenum)
+            finalhome = os.path.join(self.storedir, idlib.a2b(verifierid), "%d"%sharenum)
+            bucketwriters[sharenum] = BucketWriter(tmphome, finalhome, blocksize)
+            
+        return bucketwriters
 
     def remote_get_buckets(self, verifierid):
-        return self._bucketstore.get_buckets(verifierid)
+        bucketreaders = {} # k: sharenum, v: BucketReader
+        verifierdir = os.path.join(self.storedir, idlib.b2a(verifierid))
+        for f in os.listdir(verifierdir):
+            _assert(NUM_RE.match(f))
+            bucketreaders[int(f)] = BucketReader(os.path.join(verifierdir, f))
+        return bucketreaders
index a0ce5f2aa346c9db332117938104fb53d4eb0437..b9bbba2400db8d0fb22f02c45b576ccd4517329f 100644 (file)
@@ -2,15 +2,14 @@
 import os
 from twisted.trial import unittest
 
-from allmydata import client
+from allmydata import client, introducer
 
-class MyClient(client.Client):
-    def __init__(self, basedir):
+class MyIntroducerClient(introducer.IntroducerClient):
+    def __init__(self):
         self.connections = {}
-        client.Client.__init__(self, basedir)
 
-    def get_all_peerids(self):
-        return self.connections
+def permute(c, key):
+    return [ y for x, y, z in c.get_permuted_peers(key) ]
 
 class Basic(unittest.TestCase):
     def test_loadable(self):
@@ -25,17 +24,18 @@ class Basic(unittest.TestCase):
         os.mkdir(basedir)
         open(os.path.join(basedir, "introducer.furl"), "w").write("")
         open(os.path.join(basedir, "vdrive.furl"), "w").write("")
-        c = MyClient(basedir)
+        c = client.Client(basedir)
+        c.introducer_client = MyIntroducerClient()
         for k in ["%d" % i for i in range(5)]:
-            c.connections[k] = None
-        self.failUnlessEqual(c.permute_peerids("one"), ['3','1','0','4','2'])
-        self.failUnlessEqual(c.permute_peerids("one", 3), ['3','1','0'])
-        self.failUnlessEqual(c.permute_peerids("two"), ['0','4','2','1','3'])
-        c.connections.clear()
-        self.failUnlessEqual(c.permute_peerids("one"), [])
-
-        c2 = MyClient(basedir)
+            c.introducer_client.connections[k] = None
+        self.failUnlessEqual(permute(c, "one"), ['3','1','0','4','2'])
+        self.failUnlessEqual(permute(c, "two"), ['0','4','2','1','3'])
+        c.introducer_client.connections.clear()
+        self.failUnlessEqual(permute(c, "one"), [])
+
+        c2 = client.Client(basedir)
+        c2.introducer_client = MyIntroducerClient()
         for k in ["%d" % i for i in range(5)]:
-            c2.connections[k] = None
-        self.failUnlessEqual(c2.permute_peerids("one"), ['3','1','0','4','2'])
+            c2.introducer_client.connections[k] = None
+        self.failUnlessEqual(permute(c2, "one"), ['3','1','0','4','2'])
 
diff --git a/src/allmydata/test/test_ring.py b/src/allmydata/test/test_ring.py
new file mode 100644 (file)
index 0000000..59737f2
--- /dev/null
@@ -0,0 +1,19 @@
+#! /usr/bin/python
+
+from twisted.trial import unittest
+
+from allmydata.util import ring
+
+class Ring(unittest.TestCase):
+    def test_1(self):
+        self.failUnlessEquals(ring.distance(8, 9), 1)
+        self.failUnlessEquals(ring.distance(9, 8), 2**160-1)
+        self.failUnlessEquals(ring.distance(2, 2**160-1), 2**160-3)
+        self.failUnlessEquals(ring.distance(2**160-1, 2), 3)
+        self.failUnlessEquals(ring.distance(0, 2**159), 2**159)
+        self.failUnlessEquals(ring.distance(2**159, 0), 2**159)
+        self.failUnlessEquals(ring.distance(2**159-1, 2**159+1), 2)
+        self.failUnlessEquals(ring.distance(2**159-1, 1), 2**159+2)
+        self.failUnlessEquals(ring.distance(2**159-1, 2**159-1), 0)
+        self.failUnlessEquals(ring.distance(0, 0), 0)
+
index a17c37122fd344a87685b5123961a902799e1402..686f23b08e08f6371baa4db35b7067c208eb515b 100644 (file)
@@ -55,8 +55,7 @@ class FakeClient:
             else:
                 self.peers.append(FakePeer(str(peerid), r))
 
-    def permute_peerids(self, key, max_peers):
-        assert max_peers == None
+    def get_permuted_connections(self, key):
         return [str(i) for i in range(len(self.peers))]
 
     def get_remote_service(self, peerid, name):
@@ -202,13 +201,12 @@ class FakePeer2:
 
 class FakeClient2:
     nodeid = "fakeclient"
-    def __init__(self, max_peers):
+    def __init__(self, num_peers):
         self.peers = []
-        for peerid in range(max_peers):
+        for peerid in range(num_peers):
             self.peers.append(FakePeer2(str(peerid)))
 
-    def permute_peerids(self, key, max_peers):
-        assert max_peers == None
+    def get_permuted_connections(self, key):
         return [str(i) for i in range(len(self.peers))]
 
     def get_remote_service(self, peerid, name):
index 0fdf309dad3dc832bc3c38d31987d0464f657cee..c994c9c0de922834f9f9f65897bcfe843053205a 100644 (file)
@@ -1,19 +1,16 @@
-
 from zope.interface import implements
-from twisted.python import failure, log
+from twisted.python import log
 from twisted.internet import defer
 from twisted.application import service
 from foolscap import Referenceable
 
-from allmydata.util import idlib, bencode, mathutil
-from allmydata.util.idlib import peerid_to_short_string as shortid
-from allmydata.util.deferredutil import DeferredListShouldSucceed
+from allmydata.util import idlib, mathutil
 from allmydata import codec
 from allmydata.uri import pack_uri
 from allmydata.interfaces import IUploadable, IUploader
 
 from cStringIO import StringIO
-import sha
+import collections, random, sha
 
 class NotEnoughPeersError(Exception):
     pass
@@ -26,19 +23,38 @@ class HaveAllPeersError(Exception):
 class TooFullError(Exception):
     pass
 
+class PeerTracker:
+    def __init__(self, peerid, connection, sharesize, blocksize, verifierid):
+        self.peerid = peerid
+        self.connection = connection
+        self.buckets = {} # k: shareid, v: IRemoteBucketWriter
+        self.sharesize = sharesize
+        self.blocksize = blocksize
+        self.verifierid = verifierid
+
+    def query(self, sharenums):
+        d = self.connection.callRemote("allocate_buckets", self._verifierid,
+                                       sharenums, self.sharesize,
+                                       self.blocksize, canary=Referenceable())
+        d.addCallback(self._got_reply)
+        return d
+        
+    def _got_reply(self, (alreadygot, buckets)):
+        self.buckets.update(buckets)
+        return (alreadygot, set(buckets.keys()))
 
 class FileUploader:
     debug = False
     ENCODERCLASS = codec.CRSEncoder
 
 
-    def __init__(self, peer):
-        self._peer = peer
+    def __init__(self, client):
+        self._client = client
 
-    def set_params(self, min_shares, target_goodness, max_shares):
-        self.min_shares = min_shares
-        self.target_goodness = target_goodness
-        self.max_shares = max_shares
+    def set_params(self, needed_shares, shares_of_happiness, total_shares):
+        self.needed_shares = needed_shares
+        self.shares_of_happiness = shares_of_happiness
+        self.total_shares = total_shares
 
     def set_filehandle(self, filehandle):
         self._filehandle = filehandle
@@ -64,195 +80,120 @@ class FileUploader:
         log.msg("starting upload [%s]" % (idlib.b2a(self._verifierid),))
         if self.debug:
             print "starting upload"
-        assert self.min_shares
-        assert self.target_goodness
+        assert self.needed_shares
 
         # create the encoder, so we can know how large the shares will be
-        total_shares = self.max_shares
-        needed_shares = self.min_shares
         self._encoder = self.ENCODERCLASS()
         self._codec_name = self._encoder.get_encoder_type()
-        self._needed_shares = needed_shares
-        paddedsize = self._size + mathutil.pad_size(self._size, needed_shares)
-        self._encoder.set_params(paddedsize, needed_shares, total_shares)
+        paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
+        self._encoder.set_params(paddedsize, self.needed_shares, self.total_shares)
         self._share_size = self._encoder.get_share_size()
 
         # first step: who should we upload to?
-
-        # We will talk to at most max_peers (which can be None to mean no
-        # limit). Maybe limit max_peers to 2*len(self.shares), to reduce
-        # memory footprint. For now, make it unlimited.
-        max_peers = None
-
-        self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
-        self.peers_who_said_yes = []
-        self.peers_who_said_no = []
-        self.peers_who_had_errors = []
-
-        self._total_peers = len(self.permuted)
-        for p in self.permuted:
-            assert isinstance(p, str)
-        # we will shrink self.permuted as we give up on peers
-
-        d = defer.maybeDeferred(self._find_peers)
-        d.addCallback(self._got_enough_peers)
+        peers = self._client.get_permuted_peers(self._verifierid)
+        assert peers
+        trackers = [ (permutedid, PeerTracker(peerid, conn),)
+                     for permutedid, peerid, conn in peers ]
+        ring_things = [] # a list of (position_in_ring, whatami, x) where whatami is 0 if x is a sharenum or else 1 if x is a PeerTracker instance
+        ring_things.extend([ (permutedpeerid, 1, peer,) for permutedpeerid, peer in trackers ])
+        shares = [ (i * 2**160 / self.total_shares, 0, i) for i in range(self.total_shares) ]
+        ring_things.extend(shares)
+        ring_things.sort()
+        self.ring_things = collections.deque(ring_things)
+        self.usable_peers = set([peer for permutedid, peer in trackers])
+        self.used_peers = set()
+        self.unallocated_sharenums = set(shares)
+
+        d = self._locate_all_shareholders()
+        d.addCallback(self._send_shares)
         d.addCallback(self._compute_uri)
         return d
 
-    def _compute_uri(self, params):
-        return pack_uri(self._codec_name, params, self._verifierid)
-
-    def _build_not_enough_peers_error(self):
-        yes = ",".join([shortid(p) for p in self.peers_who_said_yes])
-        no = ",".join([shortid(p) for p in self.peers_who_said_no])
-        err = ",".join([shortid(p) for p in self.peers_who_had_errors])
-        msg = ("%s goodness, want %s, have %d "
-               "landlords, %d total peers, "
-               "peers:yes=%s;no=%s;err=%s" %
-               (self.goodness_points, self.target_goodness,
-                len(self.landlords), self._total_peers,
-                yes, no, err))
-        return msg
-
-    def _find_peers(self):
-        # this returns a Deferred which fires (with a meaningless value) when
-        # enough peers are found, or errbacks with a NotEnoughPeersError if
-        # not.
-        self.peer_index = 0
-        self.goodness_points = 0
-        self.landlords = [] # list of (peerid, bucket_num, remotebucket)
-        return self._check_next_peer()
-
-    def _check_next_peer(self):
-        if self.debug:
-            log.msg("FileUploader._check_next_peer: %d permuted, %d goodness"
-                    " (want %d), have %d landlords, %d total peers" %
-                    (len(self.permuted), self.goodness_points,
-                     self.target_goodness, len(self.landlords),
-                     self._total_peers))
-        if (self.goodness_points >= self.target_goodness and
-            len(self.landlords) >= self.min_shares):
-            if self.debug: print " we're done!"
-            return "done"
-        if not self.permuted:
-            # we've run out of peers to check without finding enough, which
-            # means we won't be able to upload this file. Bummer.
-            msg = self._build_not_enough_peers_error()
-            log.msg("NotEnoughPeersError: %s" % msg)
-            raise NotEnoughPeersError(msg)
-
-        # otherwise we use self.peer_index to rotate through all the usable
-        # peers. It gets inremented elsewhere, but wrapped here.
-        if self.peer_index >= len(self.permuted):
-            self.peer_index = 0
-
-        peerid = self.permuted[self.peer_index]
-
-        d = self._check_peer(peerid)
-        d.addCallback(lambda res: self._check_next_peer())
-        return d
-
-    def _check_peer(self, peerid):
-        # contact a single peer, and ask them to hold a share. If they say
-        # yes, we update self.landlords and self.goodness_points, and
-        # increment self.peer_index. If they say no, or are uncontactable, we
-        # remove them from self.permuted. This returns a Deferred which never
-        # errbacks.
-
-        bucket_num = len(self.landlords)
-        d = self._peer.get_remote_service(peerid, "storageserver")
-        def _got_peer(service):
-            if self.debug: print "asking %s" % shortid(peerid)
-            d2 = service.callRemote("allocate_bucket",
-                                    verifierid=self._verifierid,
-                                    bucket_num=bucket_num,
-                                    size=self._share_size,
-                                    leaser=self._peer.nodeid,
-                                    canary=Referenceable())
-            return d2
-        d.addCallback(_got_peer)
-
-        def _allocate_response(bucket):
-            if self.debug:
-                print " peerid %s will grant us a lease" % shortid(peerid)
-            self.peers_who_said_yes.append(peerid)
-            self.landlords.append( (peerid, bucket_num, bucket) )
-            self.goodness_points += 1
-            self.peer_index += 1
-
-        d.addCallback(_allocate_response)
-
-        def _err(f):
-            if self.debug: print "err from peer %s:" % idlib.b2a(peerid)
-            assert isinstance(f, failure.Failure)
-            if f.check(TooFullError):
-                if self.debug: print " too full"
-                self.peers_who_said_no.append(peerid)
-            elif f.check(IndexError):
-                if self.debug: print " no connection"
-                self.peers_who_had_errors.append(peerid)
-            else:
-                if self.debug: print " other error:", f
-                self.peers_who_had_errors.append(peerid)
-                log.msg("FileUploader._check_peer(%s): err" % shortid(peerid))
-                log.msg(f)
-            self.permuted.remove(peerid) # this peer was unusable
-            return None
-        d.addErrback(_err)
-        return d
-
-    def _got_enough_peers(self, res):
-        landlords = self.landlords
-        if self.debug:
-            log.msg("FileUploader._got_enough_peers")
-            log.msg(" %d landlords" % len(landlords))
-            if len(landlords) < 20:
-                log.msg(" peerids: %s" % " ".join([idlib.b2a(l[0])
-                                                   for l in landlords]))
-                log.msg(" buckets: %s" % " ".join([str(l[1])
-                                                   for l in landlords]))
-        # assign shares to landlords
-        self.sharemap = {}
-        for peerid, bucket_num, bucket in landlords:
-            self.sharemap[bucket_num] = bucket
-        # the sharemap should have exactly len(landlords) shares, with
-        # no holes
-        assert sorted(self.sharemap.keys()) == range(len(landlords))
-        # encode all the data at once: this class does not use segmentation
-        data = self._filehandle.read()
-
-        # xyz i am about to go away anyway.
-        chunksize = mathutil.div_ceil(len(data), self._needed_shares)
-        numchunks = mathutil.div_ceil(len(data), chunksize)
-        l = [ data[i:i+chunksize] for i in range(0, len(data), chunksize) ]
-        # padding
-        if len(l[-1]) != len(l[0]):
-            l[-1] = l[-1] + ('\x00'*(len(l[0])-len(l[-1])))
-        d = self._encoder.encode(l, self.sharemap.keys())
-        d.addCallback(self._send_all_shares)
-        d.addCallback(lambda res: self._encoder.get_serialized_params())
+    def _locate_all_shareholders(self):
+        """
+        @return: a set of PeerTracker instances that have agreed to hold some
+            shares for us
+        """
+        d = self._query_peers()
+        def _done(res):
+            if not self.unallocated_sharenums:
+                return self._used_peers
+            if not self.usable_peers:
+                if len(self.unallocated_sharenums) < (self.total_shares - self.shares_of_happiness):
+                    # close enough
+                    return self._used_peers
+                raise NotEnoughPeersError
+            return self._query_peers()
+        d.addCallback(_done)
         return d
 
-    def _send_one_share(self, bucket, sharedata, metadata):
-        d = bucket.callRemote("write", sharedata)
-        d.addCallback(lambda res:
-                      bucket.callRemote("set_metadata", metadata))
-        d.addCallback(lambda res:
-                      bucket.callRemote("close"))
-        return d
+    def _query_peers(self):
+        """
+        @return: a deferred that fires when all queries have resolved
+        """
+        # Choose a random starting point, talk to that peer.
+        self.ring_things.rotate(random.randrange(0, len(self.ring_things)))
+
+        # Walk backwards to find a peer.  We know that we'll eventually find
+        # one because we earlier asserted that there was at least one.
+        while self.ring_things[0][1] != 1:
+            self.ring_things.rotate(-1)
+        startingpoint = self.ring_things[0]
+        peer = startingpoint[2]
+        assert isinstance(peer, PeerTracker), peer
+        self.ring_things.rotate(-1)
+
+        # loop invariant: at the top of the loop, we are always one step to
+        # the left of a peer, which is stored in the peer variable.
+        outstanding_queries = []
+        while self.ring_things[0] != startingpoint:
+            # Walk backwards to find the previous peer (could be the same one).
+            # Accumulate all shares that we find along the way.
+            sharenums_to_query = set()
+            while self.ring_things[0][1] != 1:
+                sharenums_to_query.add(self.ring_things[0][2])
+                self.ring_things.rotate(-1)
+
+            d = peer.query(sharenums_to_query)
+            d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,))
+            outstanding_queries.append(d)
+
+            peer = self.ring_things[0][2]
+            assert isinstance(peer, PeerTracker), peer
+            self.ring_things.rotate(-1)
+
+        return defer.DeferredList(outstanding_queries)
+
+    def _got_response(self, (alreadygot, allocated), peer, shares_we_requested):
+        """
+        @type alreadygot: a set of sharenums
+        @type allocated: a set of sharenums
+        """
+        self.unallocated_sharenums -= alreadygot
+        self.unallocated_sharenums -= allocated
+
+        if allocated:
+            self.used_peers.add(peer)
+
+        if shares_we_requested - alreadygot - allocated:
+            # Then he didn't accept some of the shares, so he's full.
+            self.usable_peers.remove(peer)
+
+    def _got_error(self, f, peer):
+        self.usable_peers -= peer
+
+    def _send_shares(self, used_peers):
+        buckets = {}
+        for peer in used_peers:
+            buckets.update(peer.buckets)
+        assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
+        self._encoder.set_shareholders(buckets)
+        return self._encoder.start()
+
+    def _compute_uri(self, roothash):
+        params = self._encoder.get_serialized_params()
+        return pack_uri(self._codec_name, params, self._verifierid, roothash)
 
-    def _send_all_shares(self, (shares, shareids)):
-        dl = []
-        for (shareid, share) in zip(shareids, shares):
-            if self.debug:
-                log.msg(" writing share %d" % shareid)
-            metadata = bencode.bencode(shareid)
-            assert len(share) == self._share_size
-            assert isinstance(share, str)
-            bucket = self.sharemap[shareid]
-            d = self._send_one_share(bucket, share, metadata)
-            dl.append(d)
-        return DeferredListShouldSucceed(dl)
 
 def netstring(s):
     return "%d:%s," % (len(s), s)
@@ -293,6 +234,10 @@ class Uploader(service.MultiService):
     uploader_class = FileUploader
     debug = False
 
+    needed_shares = 25 # Number of shares required to reconstruct a file.
+    desired_shares = 75 # We will abort an upload unless we can allocate space for at least this many.
+    total_shares = 100 # Total number of shares created by encoding.  If everybody has room then this is is how many we will upload.
+
     def _compute_verifierid(self, f):
         hasher = sha.new(netstring("allmydata_v1_verifierid"))
         f.seek(0)
@@ -314,7 +259,7 @@ class Uploader(service.MultiService):
         u.set_filehandle(fh)
         # push two shares, require that we get two back. TODO: this is
         # temporary, of course.
-        u.set_params(2, 2, 4)
+        u.set_params(self.needed_shares, self.desired_shares, self.total_shares)
         u.set_verifierid(self._compute_verifierid(fh))
         d = u.start()
         def _done(res):
diff --git a/src/allmydata/util/ring.py b/src/allmydata/util/ring.py
new file mode 100644 (file)
index 0000000..0ffebec
--- /dev/null
@@ -0,0 +1,13 @@
+
+def distance(p1, p2, FULL = 2**160, HALF = 2**159):
+    """
+    Distance between two points in the space, expressed as longs.
+
+    @param p1: long of first point
+    @param p2: long of second point
+    """
+    d = p2 - p1
+    if d < 0:
+        d = FULL + d
+    return d
+