from twisted.internet import defer
+from allmydata.Crypto.Util.number import bytes_to_long
from allmydata.storageserver import StorageServer
from allmydata.upload import Uploader
from allmydata.download import Downloader
def get_all_peerids(self):
return self.introducer_client.connections.iterkeys()
- def permute_peerids(self, key, max_count=None):
- # TODO: eventually reduce memory consumption by doing an insertion
- # sort of at most max_count elements
+ def get_permuted_peers(self, key):
+ """
+ @return: list of (permuted-peerid, peerid, connection,)
+ """
results = []
- for nodeid in self.get_all_peerids():
- assert isinstance(nodeid, str)
- permuted = sha.new(key + nodeid).digest()
- results.append((permuted, nodeid))
+ for peerid, connection in self.introducer_client.connections.iteritems():
+ assert isinstance(peerid, str)
+ permuted = bytes_to_long(sha.new(key + peerid).digest())
+ results.append((permuted, peerid, connection))
results.sort()
- results = [r[1] for r in results]
- if max_count is None:
- return results
- return results[:max_count]
+ return results
# footprint
max_peers = None
- self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
+ self.permuted = self._peer.get_permuted_connections(self._verifierid, max_peers)
for p in self.permuted:
assert isinstance(p, str)
self.landlords = [] # list of (peerid, bucket_num, remotebucket)
Each segment (A,B,C) is read into memory, encrypted, and encoded into
-subshares. The 'share' (say, share #1) that makes it out to a host is a
-collection of these subshares (subshare A1, B1, C1), plus some hash-tree
+blocks. The 'share' (say, share #1) that makes it out to a host is a
+collection of these blocks (block A1, B1, C1), plus some hash-tree
information necessary to validate the data upon retrieval. Only one segment
-is handled at a time: all subshares for segment A are delivered before any
+is handled at a time: all blocks for segment A are delivered before any
work is begun on segment B.
-As subshares are created, we retain the hash of each one. The list of
-subshare hashes for a single share (say, hash(A1), hash(B1), hash(C1)) is
+As blocks are created, we retain the hash of each one. The list of
+block hashes for a single share (say, hash(A1), hash(B1), hash(C1)) is
used to form the base of a Merkle hash tree for that share (hashtrees[1]).
-This hash tree has one terminal leaf per subshare. The complete subshare hash
+This hash tree has one terminal leaf per block. The complete block hash
tree is sent to the shareholder after all the data has been sent. At
retrieval time, the decoder will ask for specific pieces of this tree before
-asking for subshares, whichever it needs to validate those subshares.
+asking for blocks, whichever it needs to validate those blocks.
-(Note: we don't really need to generate this whole subshare hash tree
+(Note: we don't really need to generate this whole block hash tree
ourselves. It would be sufficient to have the shareholder generate it and
just tell us the root. This gives us an extra level of validation on the
transfer, though, and it is relatively cheap to compute.)
-Each of these subshare hash trees has a root hash. The collection of these
+Each of these block hash trees has a root hash. The collection of these
root hashes for all shares are collected into the 'share hash tree', which
-has one terminal leaf per share. After sending the subshares and the complete
-subshare hash tree to each shareholder, we send them the portion of the share
+has one terminal leaf per share. After sending the blocks and the complete
+block hash tree to each shareholder, we send them the portion of the share
hash tree that is necessary to validate their share. The root of the share
hash tree is put into the URI.
if False:
block = "".join(all_hashes)
return self.send(shareid, "write", block, offset=0)
- return self.send(shareid, "put_subshare_hashes", all_hashes)
+ return self.send(shareid, "put_block_hashes", all_hashes)
def send_all_share_hash_trees(self):
dl = []
def done(self):
return self.root_hash
-
-
-from foolscap import RemoteInterface
-from foolscap.schema import ListOf, TupleOf
-
-
-class RIStorageBucketWriter(RemoteInterface):
- def put_subshare(segment_number=int, subshare=str):
- return None
- def put_segment_hashes(all_hashes=ListOf(str)):
- return None
- def put_share_hashes(needed_hashes=ListOf(TupleOf(int,str))):
- return None
- #def write(data=str, offset=int):
- # return None
-class RIStorageBucketReader(RemoteInterface):
- def get_share_hashes():
- return ListOf(TupleOf(int,str))
- def get_segment_hashes(which=ListOf(int)):
- return ListOf(str)
- def get_subshare(segment_number=int):
- return str
- #def read(size=int, offset=int):
- # return str
from foolscap.schema import StringConstraint, ListOf, TupleOf, Any
from foolscap import RemoteInterface
+HASH_SIZE=32
+
+Hash = StringConstraint(HASH_SIZE) # binary format 32-byte SHA256 hash
Nodeid = StringConstraint(20) # binary format 20-byte SHA1 hash
PBURL = StringConstraint(150)
Verifierid = StringConstraint(20)
def get_nodeid():
return Nodeid
-class RIStorageServer(RemoteInterface):
- def allocate_bucket(verifierid=Verifierid, bucket_num=int, size=int,
- leaser=Nodeid, canary=Referenceable_):
- # if the canary is lost before close(), the bucket is deleted
- return RIBucketWriter_
- def get_buckets(verifierid=Verifierid):
- return ListOf(TupleOf(int, RIBucketReader_))
-
class RIBucketWriter(RemoteInterface):
- def write(data=ShareData):
+ def put_block(segmentnum=int, data=ShareData):
+ return None
+
+ def put_block_hashes(blockhashes=ListOf(Hash)):
return None
- def set_metadata(metadata=str):
+
+ def put_share_hashes(sharehashes=ListOf(TupleOf(int, Hash))):
return None
+
def close():
+ """
+ If the data that has been written is incomplete or inconsistent then
+ the server will throw the data away, else it will store it for future
+ retrieval.
+ """
return None
+class RIStorageServer(RemoteInterface):
+ def allocate_buckets(verifierid=Verifierid, sharenums=SetOf(int),
+ sharesize=int, blocksize=int, canary=Referenceable_):
+ # if the canary is lost before close(), the bucket is deleted
+ return TupleOf(SetOf(int), DictOf(int, RIBucketWriter))
+ def get_buckets(verifierid=Verifierid):
+ return DictOf(int, RIBucketReader_)
class RIBucketReader(RemoteInterface):
- def read():
+ def get_block(blocknum=int):
return ShareData
- def get_metadata():
- return str
-
+ def get_block_hashes():
+ return ListOf(Hash))
+ def get_share_hashes():
+ return ListOf(TupleOf(int, Hash))
class RIMutableDirectoryNode(RemoteInterface):
def list():
"""
class IEncoder(Interface):
- """I take a sequence of plaintext bytes and a list of shareholders, then
- encrypt, encode, hash, and deliver shares to those shareholders. I will
- compute all the necessary Merkle hash trees that are necessary to
- validate the data that eventually comes back from the shareholders. I
- provide the root hash of the hash tree, and the encoding parameters, both
- of which must be included in the URI.
+ """I take a file-like object that provides a sequence of bytes and a list
+ of shareholders, then encrypt, encode, hash, and deliver shares to those
+ shareholders. I will compute all the necessary Merkle hash trees that are
+ necessary to validate the data that eventually comes back from the
+ shareholders. I provide the root hash of the hash tree, and the encoding
+ parameters, both of which must be included in the URI.
I do not choose shareholders, that is left to the IUploader. I must be
given a dict of RemoteReferences to storage buckets that are ready and
def upload_ssk(write_capability, new_version, uploadable):
pass # TODO
-
def upload_data(data):
"""Like upload(), but accepts a string."""
from foolscap import Referenceable
from twisted.application import service
-from allmydata.bucketstore import BucketStore
from zope.interface import implements
-from allmydata.interfaces import RIStorageServer
-from allmydata.util import idlib
+from allmydata.interfaces import RIStorageServer, RIBucketWriter
+from allmydata import interfaces
+from allmydata.util import bencode, fileutil, idlib
+from allmydata.util.assertutil import _assert, precondition
-class BucketAlreadyExistsError(Exception):
- pass
+# store/
+# store/tmp # temp dirs named $VERIFIERID/$SHARENUM that will be moved to store/ on success
+# store/$VERIFIERID
+# store/$VERIFIERID/$SHARENUM
+# store/$VERIFIERID/$SHARENUM/blocksize
+# store/$VERIFIERID/$SHARENUM/data
+# store/$VERIFIERID/$SHARENUM/blockhashes
+# store/$VERIFIERID/$SHARENUM/sharehashtree
+# $SHARENUM matches this regex:
+NUM_RE=re.compile("[1-9][0-9]*")
+
+class BucketWriter(Referenceable):
+ implements(RIBucketWriter)
+
+ def __init__(self, tmphome, finalhome, blocksize):
+ self.tmphome = tmphome
+ self.finalhome = finalhome
+ self.blocksize = blocksize
+ self.closed = False
+ self._write_file('blocksize', str(blocksize))
+
+ def _write_file(self, fname, data):
+ open(os.path.join(tmphome, fname), 'wb').write(data)
+
+ def remote_put_block(self, segmentnum, data):
+ precondition(not self.closed)
+ assert len(data) == self.blocksize
+ f = open(os.path.join(self.tmphome, 'data'), 'wb')
+ f.seek(self.blocksize*segmentnum)
+ f.write(data)
+
+ def remote_put_block_hashes(self, blockhashes):
+ precondition(not self.closed)
+ # TODO: verify the length of blockhashes.
+ # TODO: tighten foolscap schema to require exactly 32 bytes.
+ self._write_file('blockhashes', ''.join(blockhashes))
+
+ def remote_put_share_hashes(self, sharehashes):
+ precondition(not self.closed)
+ self._write_file('sharehashree', bencode.bencode(sharehashes))
+
+ def close(self):
+ precondition(not self.closed)
+ # TODO assert or check the completeness and consistency of the data that has been written
+ fileutil.rename(self.tmphome, self.finalhome)
+ self.closed = True
+
+def str2l(s):
+ """ split string (pulled from storage) into a list of blockids """
+ return [ s[i:i+interfaces.HASH_SIZE] for i in range(0, len(s), interfaces.HASH_SIZE) ]
+
+class BucketReader(Referenceable):
+ def __init__(self, home):
+ self.home = home
+ self.blocksize = int(self._read_file('blocksize'))
+
+ def _read_file(self, fname):
+ return open(os.path.join(self.home, fname), 'rb').read()
+
+ def remote_get_block(self, blocknum):
+ f = open(os.path.join(self.home, 'data'), 'rb')
+ f.seek(self.blocksize * blocknum)
+ return f.read(self.blocksize)
+
+ def remote_get_block_hashes(self):
+ return str2l(self._read_file('blockhashes'))
+
+ def remote_get_share_hashes(self):
+ return bencode.bdecode(self._read_file('sharehashes'))
+
class StorageServer(service.MultiService, Referenceable):
implements(RIStorageServer)
name = 'storageserver'
- def __init__(self, store_dir):
- if not os.path.isdir(store_dir):
- os.mkdir(store_dir)
+ def __init__(self, storedir):
+ fileutil.make_dirs(storedir)
+ self.storedir = storedir
+ self.tmpdir = os.path.join(storedir, 'tmp')
+ self._clean_trash()
+ fileutil.make_dirs(self.tmpdir)
+
service.MultiService.__init__(self)
- self._bucketstore = BucketStore(store_dir)
- self._bucketstore.setServiceParent(self)
- def remote_allocate_bucket(self, verifierid, bucket_num, size, leaser,
- canary):
- if self._bucketstore.has_bucket(verifierid):
- raise BucketAlreadyExistsError()
- lease = self._bucketstore.allocate_bucket(verifierid, bucket_num, size,
- idlib.b2a(leaser), canary)
- return lease
+ def _clean_trash(self):
+ fileutil.rm_dir(self.tmpdir)
+
+ def remote_allocate_buckets(self, verifierid, sharenums, sharesize,
+ blocksize, canary):
+ bucketwriters = {} # k: sharenum, v: BucketWriter
+ for sharenum in sharenums:
+ tmphome = os.path.join(self.tmpdir, idlib.a2b(verifierid), "%d"%sharenum)
+ finalhome = os.path.join(self.storedir, idlib.a2b(verifierid), "%d"%sharenum)
+ bucketwriters[sharenum] = BucketWriter(tmphome, finalhome, blocksize)
+
+ return bucketwriters
def remote_get_buckets(self, verifierid):
- return self._bucketstore.get_buckets(verifierid)
+ bucketreaders = {} # k: sharenum, v: BucketReader
+ verifierdir = os.path.join(self.storedir, idlib.b2a(verifierid))
+ for f in os.listdir(verifierdir):
+ _assert(NUM_RE.match(f))
+ bucketreaders[int(f)] = BucketReader(os.path.join(verifierdir, f))
+ return bucketreaders
import os
from twisted.trial import unittest
-from allmydata import client
+from allmydata import client, introducer
-class MyClient(client.Client):
- def __init__(self, basedir):
+class MyIntroducerClient(introducer.IntroducerClient):
+ def __init__(self):
self.connections = {}
- client.Client.__init__(self, basedir)
- def get_all_peerids(self):
- return self.connections
+def permute(c, key):
+ return [ y for x, y, z in c.get_permuted_peers(key) ]
class Basic(unittest.TestCase):
def test_loadable(self):
os.mkdir(basedir)
open(os.path.join(basedir, "introducer.furl"), "w").write("")
open(os.path.join(basedir, "vdrive.furl"), "w").write("")
- c = MyClient(basedir)
+ c = client.Client(basedir)
+ c.introducer_client = MyIntroducerClient()
for k in ["%d" % i for i in range(5)]:
- c.connections[k] = None
- self.failUnlessEqual(c.permute_peerids("one"), ['3','1','0','4','2'])
- self.failUnlessEqual(c.permute_peerids("one", 3), ['3','1','0'])
- self.failUnlessEqual(c.permute_peerids("two"), ['0','4','2','1','3'])
- c.connections.clear()
- self.failUnlessEqual(c.permute_peerids("one"), [])
-
- c2 = MyClient(basedir)
+ c.introducer_client.connections[k] = None
+ self.failUnlessEqual(permute(c, "one"), ['3','1','0','4','2'])
+ self.failUnlessEqual(permute(c, "two"), ['0','4','2','1','3'])
+ c.introducer_client.connections.clear()
+ self.failUnlessEqual(permute(c, "one"), [])
+
+ c2 = client.Client(basedir)
+ c2.introducer_client = MyIntroducerClient()
for k in ["%d" % i for i in range(5)]:
- c2.connections[k] = None
- self.failUnlessEqual(c2.permute_peerids("one"), ['3','1','0','4','2'])
+ c2.introducer_client.connections[k] = None
+ self.failUnlessEqual(permute(c2, "one"), ['3','1','0','4','2'])
--- /dev/null
+#! /usr/bin/python
+
+from twisted.trial import unittest
+
+from allmydata.util import ring
+
+class Ring(unittest.TestCase):
+ def test_1(self):
+ self.failUnlessEquals(ring.distance(8, 9), 1)
+ self.failUnlessEquals(ring.distance(9, 8), 2**160-1)
+ self.failUnlessEquals(ring.distance(2, 2**160-1), 2**160-3)
+ self.failUnlessEquals(ring.distance(2**160-1, 2), 3)
+ self.failUnlessEquals(ring.distance(0, 2**159), 2**159)
+ self.failUnlessEquals(ring.distance(2**159, 0), 2**159)
+ self.failUnlessEquals(ring.distance(2**159-1, 2**159+1), 2)
+ self.failUnlessEquals(ring.distance(2**159-1, 1), 2**159+2)
+ self.failUnlessEquals(ring.distance(2**159-1, 2**159-1), 0)
+ self.failUnlessEquals(ring.distance(0, 0), 0)
+
else:
self.peers.append(FakePeer(str(peerid), r))
- def permute_peerids(self, key, max_peers):
- assert max_peers == None
+ def get_permuted_connections(self, key):
return [str(i) for i in range(len(self.peers))]
def get_remote_service(self, peerid, name):
class FakeClient2:
nodeid = "fakeclient"
- def __init__(self, max_peers):
+ def __init__(self, num_peers):
self.peers = []
- for peerid in range(max_peers):
+ for peerid in range(num_peers):
self.peers.append(FakePeer2(str(peerid)))
- def permute_peerids(self, key, max_peers):
- assert max_peers == None
+ def get_permuted_connections(self, key):
return [str(i) for i in range(len(self.peers))]
def get_remote_service(self, peerid, name):
-
from zope.interface import implements
-from twisted.python import failure, log
+from twisted.python import log
from twisted.internet import defer
from twisted.application import service
from foolscap import Referenceable
-from allmydata.util import idlib, bencode, mathutil
-from allmydata.util.idlib import peerid_to_short_string as shortid
-from allmydata.util.deferredutil import DeferredListShouldSucceed
+from allmydata.util import idlib, mathutil
from allmydata import codec
from allmydata.uri import pack_uri
from allmydata.interfaces import IUploadable, IUploader
from cStringIO import StringIO
-import sha
+import collections, random, sha
class NotEnoughPeersError(Exception):
pass
class TooFullError(Exception):
pass
+class PeerTracker:
+ def __init__(self, peerid, connection, sharesize, blocksize, verifierid):
+ self.peerid = peerid
+ self.connection = connection
+ self.buckets = {} # k: shareid, v: IRemoteBucketWriter
+ self.sharesize = sharesize
+ self.blocksize = blocksize
+ self.verifierid = verifierid
+
+ def query(self, sharenums):
+ d = self.connection.callRemote("allocate_buckets", self._verifierid,
+ sharenums, self.sharesize,
+ self.blocksize, canary=Referenceable())
+ d.addCallback(self._got_reply)
+ return d
+
+ def _got_reply(self, (alreadygot, buckets)):
+ self.buckets.update(buckets)
+ return (alreadygot, set(buckets.keys()))
class FileUploader:
debug = False
ENCODERCLASS = codec.CRSEncoder
- def __init__(self, peer):
- self._peer = peer
+ def __init__(self, client):
+ self._client = client
- def set_params(self, min_shares, target_goodness, max_shares):
- self.min_shares = min_shares
- self.target_goodness = target_goodness
- self.max_shares = max_shares
+ def set_params(self, needed_shares, shares_of_happiness, total_shares):
+ self.needed_shares = needed_shares
+ self.shares_of_happiness = shares_of_happiness
+ self.total_shares = total_shares
def set_filehandle(self, filehandle):
self._filehandle = filehandle
log.msg("starting upload [%s]" % (idlib.b2a(self._verifierid),))
if self.debug:
print "starting upload"
- assert self.min_shares
- assert self.target_goodness
+ assert self.needed_shares
# create the encoder, so we can know how large the shares will be
- total_shares = self.max_shares
- needed_shares = self.min_shares
self._encoder = self.ENCODERCLASS()
self._codec_name = self._encoder.get_encoder_type()
- self._needed_shares = needed_shares
- paddedsize = self._size + mathutil.pad_size(self._size, needed_shares)
- self._encoder.set_params(paddedsize, needed_shares, total_shares)
+ paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
+ self._encoder.set_params(paddedsize, self.needed_shares, self.total_shares)
self._share_size = self._encoder.get_share_size()
# first step: who should we upload to?
-
- # We will talk to at most max_peers (which can be None to mean no
- # limit). Maybe limit max_peers to 2*len(self.shares), to reduce
- # memory footprint. For now, make it unlimited.
- max_peers = None
-
- self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
- self.peers_who_said_yes = []
- self.peers_who_said_no = []
- self.peers_who_had_errors = []
-
- self._total_peers = len(self.permuted)
- for p in self.permuted:
- assert isinstance(p, str)
- # we will shrink self.permuted as we give up on peers
-
- d = defer.maybeDeferred(self._find_peers)
- d.addCallback(self._got_enough_peers)
+ peers = self._client.get_permuted_peers(self._verifierid)
+ assert peers
+ trackers = [ (permutedid, PeerTracker(peerid, conn),)
+ for permutedid, peerid, conn in peers ]
+ ring_things = [] # a list of (position_in_ring, whatami, x) where whatami is 0 if x is a sharenum or else 1 if x is a PeerTracker instance
+ ring_things.extend([ (permutedpeerid, 1, peer,) for permutedpeerid, peer in trackers ])
+ shares = [ (i * 2**160 / self.total_shares, 0, i) for i in range(self.total_shares) ]
+ ring_things.extend(shares)
+ ring_things.sort()
+ self.ring_things = collections.deque(ring_things)
+ self.usable_peers = set([peer for permutedid, peer in trackers])
+ self.used_peers = set()
+ self.unallocated_sharenums = set(shares)
+
+ d = self._locate_all_shareholders()
+ d.addCallback(self._send_shares)
d.addCallback(self._compute_uri)
return d
- def _compute_uri(self, params):
- return pack_uri(self._codec_name, params, self._verifierid)
-
- def _build_not_enough_peers_error(self):
- yes = ",".join([shortid(p) for p in self.peers_who_said_yes])
- no = ",".join([shortid(p) for p in self.peers_who_said_no])
- err = ",".join([shortid(p) for p in self.peers_who_had_errors])
- msg = ("%s goodness, want %s, have %d "
- "landlords, %d total peers, "
- "peers:yes=%s;no=%s;err=%s" %
- (self.goodness_points, self.target_goodness,
- len(self.landlords), self._total_peers,
- yes, no, err))
- return msg
-
- def _find_peers(self):
- # this returns a Deferred which fires (with a meaningless value) when
- # enough peers are found, or errbacks with a NotEnoughPeersError if
- # not.
- self.peer_index = 0
- self.goodness_points = 0
- self.landlords = [] # list of (peerid, bucket_num, remotebucket)
- return self._check_next_peer()
-
- def _check_next_peer(self):
- if self.debug:
- log.msg("FileUploader._check_next_peer: %d permuted, %d goodness"
- " (want %d), have %d landlords, %d total peers" %
- (len(self.permuted), self.goodness_points,
- self.target_goodness, len(self.landlords),
- self._total_peers))
- if (self.goodness_points >= self.target_goodness and
- len(self.landlords) >= self.min_shares):
- if self.debug: print " we're done!"
- return "done"
- if not self.permuted:
- # we've run out of peers to check without finding enough, which
- # means we won't be able to upload this file. Bummer.
- msg = self._build_not_enough_peers_error()
- log.msg("NotEnoughPeersError: %s" % msg)
- raise NotEnoughPeersError(msg)
-
- # otherwise we use self.peer_index to rotate through all the usable
- # peers. It gets inremented elsewhere, but wrapped here.
- if self.peer_index >= len(self.permuted):
- self.peer_index = 0
-
- peerid = self.permuted[self.peer_index]
-
- d = self._check_peer(peerid)
- d.addCallback(lambda res: self._check_next_peer())
- return d
-
- def _check_peer(self, peerid):
- # contact a single peer, and ask them to hold a share. If they say
- # yes, we update self.landlords and self.goodness_points, and
- # increment self.peer_index. If they say no, or are uncontactable, we
- # remove them from self.permuted. This returns a Deferred which never
- # errbacks.
-
- bucket_num = len(self.landlords)
- d = self._peer.get_remote_service(peerid, "storageserver")
- def _got_peer(service):
- if self.debug: print "asking %s" % shortid(peerid)
- d2 = service.callRemote("allocate_bucket",
- verifierid=self._verifierid,
- bucket_num=bucket_num,
- size=self._share_size,
- leaser=self._peer.nodeid,
- canary=Referenceable())
- return d2
- d.addCallback(_got_peer)
-
- def _allocate_response(bucket):
- if self.debug:
- print " peerid %s will grant us a lease" % shortid(peerid)
- self.peers_who_said_yes.append(peerid)
- self.landlords.append( (peerid, bucket_num, bucket) )
- self.goodness_points += 1
- self.peer_index += 1
-
- d.addCallback(_allocate_response)
-
- def _err(f):
- if self.debug: print "err from peer %s:" % idlib.b2a(peerid)
- assert isinstance(f, failure.Failure)
- if f.check(TooFullError):
- if self.debug: print " too full"
- self.peers_who_said_no.append(peerid)
- elif f.check(IndexError):
- if self.debug: print " no connection"
- self.peers_who_had_errors.append(peerid)
- else:
- if self.debug: print " other error:", f
- self.peers_who_had_errors.append(peerid)
- log.msg("FileUploader._check_peer(%s): err" % shortid(peerid))
- log.msg(f)
- self.permuted.remove(peerid) # this peer was unusable
- return None
- d.addErrback(_err)
- return d
-
- def _got_enough_peers(self, res):
- landlords = self.landlords
- if self.debug:
- log.msg("FileUploader._got_enough_peers")
- log.msg(" %d landlords" % len(landlords))
- if len(landlords) < 20:
- log.msg(" peerids: %s" % " ".join([idlib.b2a(l[0])
- for l in landlords]))
- log.msg(" buckets: %s" % " ".join([str(l[1])
- for l in landlords]))
- # assign shares to landlords
- self.sharemap = {}
- for peerid, bucket_num, bucket in landlords:
- self.sharemap[bucket_num] = bucket
- # the sharemap should have exactly len(landlords) shares, with
- # no holes
- assert sorted(self.sharemap.keys()) == range(len(landlords))
- # encode all the data at once: this class does not use segmentation
- data = self._filehandle.read()
-
- # xyz i am about to go away anyway.
- chunksize = mathutil.div_ceil(len(data), self._needed_shares)
- numchunks = mathutil.div_ceil(len(data), chunksize)
- l = [ data[i:i+chunksize] for i in range(0, len(data), chunksize) ]
- # padding
- if len(l[-1]) != len(l[0]):
- l[-1] = l[-1] + ('\x00'*(len(l[0])-len(l[-1])))
- d = self._encoder.encode(l, self.sharemap.keys())
- d.addCallback(self._send_all_shares)
- d.addCallback(lambda res: self._encoder.get_serialized_params())
+ def _locate_all_shareholders(self):
+ """
+ @return: a set of PeerTracker instances that have agreed to hold some
+ shares for us
+ """
+ d = self._query_peers()
+ def _done(res):
+ if not self.unallocated_sharenums:
+ return self._used_peers
+ if not self.usable_peers:
+ if len(self.unallocated_sharenums) < (self.total_shares - self.shares_of_happiness):
+ # close enough
+ return self._used_peers
+ raise NotEnoughPeersError
+ return self._query_peers()
+ d.addCallback(_done)
return d
- def _send_one_share(self, bucket, sharedata, metadata):
- d = bucket.callRemote("write", sharedata)
- d.addCallback(lambda res:
- bucket.callRemote("set_metadata", metadata))
- d.addCallback(lambda res:
- bucket.callRemote("close"))
- return d
+ def _query_peers(self):
+ """
+ @return: a deferred that fires when all queries have resolved
+ """
+ # Choose a random starting point, talk to that peer.
+ self.ring_things.rotate(random.randrange(0, len(self.ring_things)))
+
+ # Walk backwards to find a peer. We know that we'll eventually find
+ # one because we earlier asserted that there was at least one.
+ while self.ring_things[0][1] != 1:
+ self.ring_things.rotate(-1)
+ startingpoint = self.ring_things[0]
+ peer = startingpoint[2]
+ assert isinstance(peer, PeerTracker), peer
+ self.ring_things.rotate(-1)
+
+ # loop invariant: at the top of the loop, we are always one step to
+ # the left of a peer, which is stored in the peer variable.
+ outstanding_queries = []
+ while self.ring_things[0] != startingpoint:
+ # Walk backwards to find the previous peer (could be the same one).
+ # Accumulate all shares that we find along the way.
+ sharenums_to_query = set()
+ while self.ring_things[0][1] != 1:
+ sharenums_to_query.add(self.ring_things[0][2])
+ self.ring_things.rotate(-1)
+
+ d = peer.query(sharenums_to_query)
+ d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,))
+ outstanding_queries.append(d)
+
+ peer = self.ring_things[0][2]
+ assert isinstance(peer, PeerTracker), peer
+ self.ring_things.rotate(-1)
+
+ return defer.DeferredList(outstanding_queries)
+
+ def _got_response(self, (alreadygot, allocated), peer, shares_we_requested):
+ """
+ @type alreadygot: a set of sharenums
+ @type allocated: a set of sharenums
+ """
+ self.unallocated_sharenums -= alreadygot
+ self.unallocated_sharenums -= allocated
+
+ if allocated:
+ self.used_peers.add(peer)
+
+ if shares_we_requested - alreadygot - allocated:
+ # Then he didn't accept some of the shares, so he's full.
+ self.usable_peers.remove(peer)
+
+ def _got_error(self, f, peer):
+ self.usable_peers -= peer
+
+ def _send_shares(self, used_peers):
+ buckets = {}
+ for peer in used_peers:
+ buckets.update(peer.buckets)
+ assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
+ self._encoder.set_shareholders(buckets)
+ return self._encoder.start()
+
+ def _compute_uri(self, roothash):
+ params = self._encoder.get_serialized_params()
+ return pack_uri(self._codec_name, params, self._verifierid, roothash)
- def _send_all_shares(self, (shares, shareids)):
- dl = []
- for (shareid, share) in zip(shareids, shares):
- if self.debug:
- log.msg(" writing share %d" % shareid)
- metadata = bencode.bencode(shareid)
- assert len(share) == self._share_size
- assert isinstance(share, str)
- bucket = self.sharemap[shareid]
- d = self._send_one_share(bucket, share, metadata)
- dl.append(d)
- return DeferredListShouldSucceed(dl)
def netstring(s):
return "%d:%s," % (len(s), s)
uploader_class = FileUploader
debug = False
+ needed_shares = 25 # Number of shares required to reconstruct a file.
+ desired_shares = 75 # We will abort an upload unless we can allocate space for at least this many.
+ total_shares = 100 # Total number of shares created by encoding. If everybody has room then this is is how many we will upload.
+
def _compute_verifierid(self, f):
hasher = sha.new(netstring("allmydata_v1_verifierid"))
f.seek(0)
u.set_filehandle(fh)
# push two shares, require that we get two back. TODO: this is
# temporary, of course.
- u.set_params(2, 2, 4)
+ u.set_params(self.needed_shares, self.desired_shares, self.total_shares)
u.set_verifierid(self._compute_verifierid(fh))
d = u.start()
def _done(res):
--- /dev/null
+
+def distance(p1, p2, FULL = 2**160, HALF = 2**159):
+ """
+ Distance between two points in the space, expressed as longs.
+
+ @param p1: long of first point
+ @param p2: long of second point
+ """
+ d = p2 - p1
+ if d < 0:
+ d = FULL + d
+ return d
+